前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PySpark 中的机器学习库

PySpark 中的机器学习库

作者头像
MeteoAI
发布2019-07-24 16:10:57
3.3K0
发布2019-07-24 16:10:57
举报
文章被收录于专栏:MeteoAI

Spark 机器学习库的产生背景

传统的机器学习算法,由于技术和单机存储的限制,比如使用scikit-learn,只能在少量数据上使用。即以前的统计/机器学习依赖于数据抽样。但实际过程中样本往往很难做好随机,导致学习的模型不是很准确,在测试数据上的效果也可能不太好。随着 HDFS(Hadoop Distributed File System) 等分布式文件系统出现,存储海量数据已经成为可能。在全量数据上进行机器学习也成为了可能,这顺便也解决了统计随机性的问题。然而,由于 MapReduce 自身的限制,使得使用 MapReduce 来实现分布式机器学习算法非常耗时和消耗磁盘IO。因为通常情况下机器学习算法参数学习的过程都是迭代计算的,即本次计算的结果要作为下一次迭代的输入,这个过程中,如果使用 MapReduce,我们只能把中间结果存储磁盘,然后在下一次计算的时候从新读取,这对于迭代频发的算法显然是致命的性能瓶颈。引用官网一句话:Apache Spark™ is a unified analytics engine for large-scale data processing.Spark, 是一种"One Stack to rule them all"的大数据计算框架,期望使用一个技术堆栈就完美地解决大数据领域的各种计算任务.

在大数据上进行机器学习,需要处理全量数据并进行大量的迭代计算,这要求机器学习平台具备强大的处理能力。Spark立足于内存计算,天然的适应于迭代式计算。即便如此,对于普通开发者来说,实现一个分布式机器学习算法仍然是一件极具挑战的事情。幸运的是,Spark提供了一个基于海量数据的机器学习库,它提供了常用机器学习算法的分布式实现,开发者只需要有 Spark 基础并且了解机器学习算法的原理,以及方法相关参数的含义,就可以轻松的通过调用相应的 API 来实现基于海量数据的机器学习过程。把机器学习作为一个模块加入到Spark中,也是大势所趋。

为了支持Spark和Python,Apache Spark社区发布了PySpark 。提供了一个Python_Shell,从而可以以交互的方式使用Python编写Spark程序,如下图。

Spark使用Spark RDD、 Spark SQL、 Spark Streaming、 MLlib、 GraphX成功解决了大数据领域中, 离线批处理、 交互式查询、 实时流计算、 机器学习与图计算等最重要的任务和问题。

Spark Session 与SparkContext

SparkSession是Spark2.0新引入的概念,为用户提供了统一的切入点,来让用户学习Spark的各项功能,其作为DataFrame和DataSet的API的切入点,内部封装了SparkConf、SparkContext和SQLContext。

在Spark的早期版本(Spark1.x)中,SparkContext是Spark的主要切入点。在当时,RDD是Spark主要的API,可以直接通过SparkContext来创建和操作RDD,但对于其他的API,则需要使用不同的context。如:对于sql,使用SQLContext;对于hive,使用hiveContext;对于Streaming,使用StreamingContext。但是随着版本的迭代,DataFrame和DataSet的API逐渐成为标准的API,就需要为它们建立新的切入点.

真假美猴王之mllib与ml

目前,Spark 中有两个机器学习库,ml和 mllib的主要区别和联系如下:
  • ml和mllib都是Spark中的机器学习库,目前常用的机器学习功能2个库都能满足需求。
  • spark官方推荐使用ml,因为ml功能更全面更灵活,未来会主要支持ml,mllib很有可能会被废弃(据说可能是在spark3.0中deprecated)。
  • ml主要操作的是DataFrame, 而mllib操作的是RDD,也就是说二者面向的数据集不一样。

不愿雨露均沾的ml

下面主要将基于DataFrams的Spark机器学习包,spark.ml:

从顶层上看,ml包主要包含三大抽象类:转换器、预测器和工作流。

转换器(Transformer):

从Transformer抽象类派生出来的每一个新的Transformer都需要实现一个.transform(…) 方法,该方法可以将一个DataFrame转换成另一个DataFrame。 在spark.ml.feature中有许多Transformer: Binarizer :给定一个阈值,该方法需要一个连续的变量将其转换为二进制。 Bucketizer:分箱(分段处理):将连续数值转换为离散类别比如特征是年龄,是一个连续数值,需要将其转换为离散类别(未成年人、青年人、中年人、老年人),就要用到Bucketizer了。 ChiSqSelector:对于分类目标变量(考虑到分类模型),此方法允许你预定义数量的特征(通过numTopFeatures参数指定)。 选择完成后,如方法的名称所示,使用卡方检验。 需要两步:首先,你需要.fit(…) 数据(为了这个方法可以计算卡方检验)。然后,调用.fit(…)方法(将你的DataFrame作为参数传递)返回一个可以用.transform(…)转换的ChiSqSelectorModel对象。 CountVectorizer:将文本文档转换为单词计数的向量。当不存在先验字典时,Countvectorizer作为Estimator提取词汇进行训练,并生成一个CountVectorizerModel用于存储相应的词汇向量空间。该模型产生文档关于词语的稀疏表示,其表示可以传递给其他算法, HashingTF : 生成词频率向量。它采用词集合并将这些集合转换成固定长度的特征向量。在文本处理中,“一组词”可能是一袋词。 HashingTF使用散列技巧。通过应用散列函数将原始要素映射到索引,然后基于映射的索引来计算项频率。 IDF : 此方法计算逆文档频率。需要注意的是文本首先要用向量表示,可以用HashingTF 或者 CountVectorizer。 MinMaxScaler:最大-最小规范化,将所有特征向量线性变换到用户指定最大-最小值之间。但注意在计算时还是一个一个特征向量分开计算的。通常将最大,最小值设置为1和0,这样就归一化到[0,1]。Spark中可以对min和max进行设置,默认就是[0,1]。 MaxAbsScaler:同样对某一个特征操作,各特征值除以最大绝对值,因此缩放到[-1,1]之间。且不移动中心点。不会将稀疏矩阵变得稠密。 Normalizer : 将某个特征向量(由所有样本某一个特征组成的向量)计算其p-范数,然后对该每个元素除以p-范数。将原始特征Normalizer以后可以使得机器学习算法有更好的表现。(默认是L2)。 Word2Vec:该方法将一个句子(字符串)作为输入,并将其转换为{string,vector}格式的映射,这种格式在自然语言处理中非常有用。 IndexToString:有StringIndexer,就应该有IndexToString。在应用StringIndexer对labels进行重新编号后,带着这些编号后的label对数据进行了训练,并接着对其他数据进行了预测,得到预测结果,预测结果的label也是重新编号过的,因此需要转换回来。

预测器(Estimators):

预测器可以被认为是需要评估的统计模型,来进行预测或对观测结果进行分类。如果派生自抽象的Estimator类,则新模型必须实现.fit(…)方法,该方法给DataFrame中的数据以及一些默认或用户指定的参数泛化模型。 1、分类 ml包提供了七种分类模型,这里介绍四种常用的模型。 LogisticRegression:逻辑回归是分类的基本模型。逻辑回归使用logit函数来计算观测到属于特定类别的概率。 DecisionTreeClassifier :构建一棵决策树以预测观察类别的分类器。maxDepth指定参数限制树的生长深度,minInstancePerNode确定进一步拆分所需的树节点中观察值的最小数目,maxBins参数指定连续变量将被分割的最大数量的区间, impurity 指定测量和计算来自分割的信息增益的度量。 RandomForestClassifier:这个模型产生多个决策树(因此称为森林),并使用这些决策树的模式输出分类结果。 RandomForestClassifier支持二元和多元标签。 NaiveBayes:基于贝叶斯定理,这个模型使用条件概率来分类观测。 PySpark ML中的NaiveBayes模型支持二元和多元标签。 2、回归 PySpark ML包中有七种模型可用于回归任务。这里只介绍两种模型,如后续需要用可查阅官方手册。 LinearRegression:最简单的回归模型,它假定了特征和连续标签之间的线性关系,以及误差项的正态性。 DecisionTreeRegressor:与分类模型类似,标签是连续的而不是二元或多元的。 3、聚类 聚类是一种无监督的模型。PySpark ML包提供了四种模型。 BisectingKMeans :k-means 聚类和层次聚类的组合。该算法以单个簇中的所有观测值开始,并将数据迭代地分成k个簇。 KMeans : 将数据分成k个簇,随机生成k个初始点作为质心,将数据集中的数据按照距离质心的远近分到各个簇中,将各个簇中的数据求平均值,作为新的质心,重复上一步,直到所有的簇不再改变。 GaussianMixture:这个方法使用k个未知的高斯分布参数来剖析数据集。使用期望最大化算法,通过最大化对数似然函数来找到高斯参数。 LDA:此模型用于自然语言处理应用程序中的主题建模。

管道/工作流(Pipeline):

Spark ML Pipeline 的出现,是受到了 scikit-learn 项目的启发,并且总结了 MLlib 在处理复杂机器学习问题上的弊端,旨在向用户提供基于 DataFrame 之上的更加高层次的 API 库,以更加方便的构建复杂的机器学习工作流式应用。pipeline将多个Transformer和Estimator串成一个特定的ML Wolkflow,一个 Pipeline 在结构上会包含一个或多个 PipelineStage,每一个 PipelineStage 都会完成一个任务,如数据集处理转化,模型训练,参数设置或数据预测等,这样的 PipelineStage 在 ML 里按照处理问题类型的不同都有相应的定义和实现。

借助于Pipeline,在Spark上进行机器学习的数据流向更加清晰,同时每一个stage的任务也更加明了,因此,无论是在模型的预测使用上、还是模型后续的改进优化上,都变得更加容易。

基于PySpak.ml的GBDT算法分类任务实现

代码语言:javascript
复制
#加载相关库
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer
from numpy import allclose
from pyspark.sql.types import *
from pyspark.sql import Row,functions
from pyspark.ml.linalg import Vector,Vectors
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer,HashingTF, Tokenizer

定义一个函数,对特征和标签的类型进行处理,特征改为浮点型,标签改为字符型。

代码语言:javascript
复制

def f(x):
    rel = {}
    rel['features'] = Vectors.dense(float(x[]),float(x[]),float(x[]),float(x[]))
    rel['label'] = str(x[])
    return rel
代码语言:javascript
复制
#读取本地数据
data = spark.sparkContext.textFile('/home/lee/桌面/myjiris.txt').\
map(lambda line: line.split(',')).\
map(lambda p: Row(**f(p))).\
toDF()

data.createOrReplaceTempView("iris")
df = spark.sql("select * from iris where label != 'Iris-setosa'")
rel = df.rdd.map(lambda t : str(t[])+":"+str(t[])).collect()   #新版本要显示调用 ,这一行现在加了.rdd

labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)
featureIndexer = VectorIndexer().\
setInputCol("features").\
setOutputCol("indexedFeatures").\
fit(df)
#打乱样本,切分出训练集,测试集
trainingData, testData = df.randomSplit([0.7,0.3])

trainingSet =trainingData
train_num = trainingSet.count()
print("训练样本数:{}".format(train_num))

#使用GBDT进行训练
stringIndexer = StringIndexer(inputCol="label", outputCol="indexed")
si_model = stringIndexer.fit(trainingSet)
tf = si_model.transform(trainingSet)
gbdt = GBTClassifier(maxIter=, maxDepth=, labelCol="indexed", seed=)
gbdtModel = gbdt.fit(tf)

#测试
#data = spark.sql("""select * from XXX""")
#构造测试数据集
testSet=testData
#testSet = data.rdd.map(list).map(lambda x:Row(label=x[-1], features=Vectors.dense(x[:-1]))).toDF()
print("测试样本数:{}".format(testSet.count()))
#print(testSet.show())
si_model = stringIndexer.fit(testSet)
test_tf = si_model.transform(testSet)
result = gbdtModel.transform(test_tf)
#result.show()
#分类效果评估
total_amount=result.count()
correct_amount = result.filter(result.indexed==result.prediction).count()
precision_rate = correct_amount/total_amount
print("预测准确率为:{}".format(precision_rate))
positive_precision_amount = result.filter(result.indexed == ).filter(result.prediction == ).count()
negative_precision_amount = result.filter(result.indexed == ).filter(result.prediction == ).count()
positive_false_amount = result.filter(result.indexed == ).filter(result.prediction == ).count()
negative_false_amount = result.filter(result.indexed == ).filter(result.prediction == ).count()
print("正样本预测准确数量:{},负样本预测准确数量:{}".format(positive_precision_amount,negative_precision_amount))
positive_amount = result.filter(result.indexed == ).count()
negative_amount = result.filter(result.indexed == ).count()
print("正样本数:{},负样本数:{}".format(positive_amount,negative_amount))
print("正样本预测错误数量:{},负样本预测错误数量:{}".format(positive_false_amount,negative_false_amount))
recall_rate1 = positive_precision_amount/positive_amount
recall_rate2 = negative_precision_amount/negative_amount
print("正样本召回率为:{},负样本召回率为:{}".format(recall_rate1,recall_rate2))
代码语言:javascript
复制
-------------------------------------------------



本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-03-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 MeteoAI 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark 机器学习库的产生背景
  • Spark Session 与SparkContext
  • 真假美猴王之mllib与ml
    • 目前,Spark 中有两个机器学习库,ml和 mllib的主要区别和联系如下:
    • 不愿雨露均沾的ml
      • 转换器(Transformer):
        • 预测器(Estimators):
          • 管道/工作流(Pipeline):
          • 基于PySpak.ml的GBDT算法分类任务实现
          相关产品与服务
          联邦学习
          联邦学习(Federated Learning,FELE)是一种打破数据孤岛、释放 AI 应用潜能的分布式机器学习技术,能够让联邦学习各参与方在不披露底层数据和底层数据加密(混淆)形态的前提下,通过交换加密的机器学习中间结果实现联合建模。该产品兼顾AI应用与隐私保护,开放合作,协同性高,充分释放大数据生产力,广泛适用于金融、消费互联网等行业的业务创新场景。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档