Spark机器学习库之线性回归

前言:ML是Spark里面的机器学习库。它使得基础的一些机器学习算法变得简单易用。Spark提供了常规的机器学习算法,比如说分类、回归、聚类、协同过滤算法。还包含了类似于sk-learn中的一些特征提取以及特征选择。

现在我们基于Spark2.21这个版本,主要是介绍基于DataFrame的机器学习该如何实现,基本参照官网上的例子。来实现一遍,以便学习。在介绍一下相关的算法。

首先我们先介绍一些什么是线性回归,是根据自变量x1,x2...xn以及应变量y的实际样本,去估计两者之间近似的函数关系f,类似于我们有三个点(1,1) (2,2) (3,3)代表的是xy,我们需要找到一个函数来满足这三个点的一个f,很明显我们可以找到是y=x。当自变量x只有一个时,我们称之为一元线性回归,多个就是多元线性回归。

回归是一种监督学习,监督学习分两种,一种是回归一种就是分类,回归主要是用于预测连续值的比如说根据特征预测房价呀,分类就是类别判断,比如说垃圾邮件识别。

线性回归的表示方式是:

f(x) = w1X1 + w2X2 + ...+wnXn+b

其实就是我们初中学习的一元方程式。w1是斜率,b是截距。

我们需要做的就是找出最优的w值和b值,使得我们的预测值误差最小。

举个简单的列子:

房价预测,我们收集了一些房价的信息,包括房子的大小(hd)、房子的地理位置(hs)、房子的房间个数(hr)、房子的楼层(hl)等等。然后我们收集了它对应的房价。

我们就需要找到一个函数,通过这些已知的数据。

f(x) = w1*hd + w2*hs + w3*hr + w4*hl +..+ b

通过已知数据慢慢拟合找到最合适的w组以及b。

然后就是优化,一般的优化我们经常才有梯度下降的方式去优化。最小二乘法。

大概就是这个样子。然后我们就是用Spark来简单的演练一下吧。

Spark的实现

我们先看看源码:这是Spark的一个优化W和b的过程

/**

* Add a new training instance to this LeastSquaresAggregator, and update the loss and gradient

* of the objective function.

*

*@param instanceThe instance of data point to be added.

*@returnThis LeastSquaresAggregator object.

*/

defadd(instance: Instance):this.type= {

instancematch{caseInstance(label,weight,features) =>

if(weight ==0.0)return this

valdiff =dot(features,effectiveCoefficientsVector) - label / labelStd +offset

if(diff !=) {

vallocalGradientSumArray =gradientSumArray

vallocalFeaturesStd =featuresStd

features.foreachActive { (index,value) =>

if(localFeaturesStd(index) !=0.0&& value !=0.0) {

localGradientSumArray(index) += weight * diff * value / localFeaturesStd(index)

}

}

lossSum+= weight * diff * diff /2.0

}

totalCnt+=1

weightSum+= weight

this

}

}

然后还有个预测:

override protected defpredict(features: Vector):Double= {

dot(features,coefficients) + intercept

}

就是w*x + b

保存:

override protected defsaveImpl(path:String):Unit= {

// Save metadata and Params

DefaultParamsWriter.saveMetadata(instance,path,sc)

// Save model data: intercept, coefficients

valdata =Data(instance.intercept,instance.coefficients)

valdataPath =newPath(path,"data").toString

sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)

}

}

保存的就是w和b两个变量,所以说我们在外面看着很陌生,一旦深入进去,就觉得很简单了,这就是在求一个变量的过程。训练结果也就是我们所谓的模型其实就是这些个变量。

然后我们用Spark的例子来实现一下吧。

packageorg.apache.spark.examples

importorg.apache.spark.sql.SparkSession

importorg.apache.spark.ml.regression.LinearRegression

importorg.apache.spark.ml.evaluation.RegressionEvaluator

objectSparkDemo {

defmain(args: Array[String]) {

valspark = SparkSession

.builder

.appName("LinearRegression").master("local")

.getOrCreate()

// Load training data

valtraining = spark.read.format("libsvm").load("file:\\D:\\sample_linear_regression_data.txt")

valArray(trainData,testData) = training.randomSplit(Array(0.8,0.2))

//模型设置参数

vallr =newLinearRegression().setMaxIter(10)

.setRegParam(0.3)

.setElasticNetParam(0.8)

//建立模型

vallrModel = lr.fit(trainData)

//打印学习到的参数

println(s"Coefficients:$Intercept:$")

//打印一些模型的参数

valtrainingSummary = lrModel.summary

println(s"numIterations:$")

println(s"objectiveHistory: [$]")

trainingSummary.residuals.show()

println(s"RMSE:$")

println(s"r2:$")

//预测数据

valpredict = lrModel.transform(testData)

predict.show(3)

//显示测试数据的指标

println("test rmse:"+newRegressionEvaluator().setMetricName("rmse").evaluate(predict))

println("test mse:"+newRegressionEvaluator().setMetricName("mse").evaluate(predict))

println("test r2:"+newRegressionEvaluator().setMetricName("r2").evaluate(predict))

println("test mae:"+newRegressionEvaluator().setMetricName("mae").evaluate(predict))

spark.stop()

}

}

// scalastyle:on println

我们来看看里面对应的输出:

差不多这样就在Spark上实现了一个线性回归。但是呢其中还是有很多参数是需要我们进一步去调试跟进的。

下面我在介绍一下该算法在Spark中的一些参数吧。

setMaxIter()设置迭代次数。默认是100.

@Since("1.3.0")

defsetMaxIter(value:Int):this.type= set(maxIter,value)

setDefault(maxIter->100)

setTol() 设置容错,当我们随着迭代次数的增加,误差值会越来越小,当误差值下于该设置的值时,就会停止迭代。默认1E-6

@Since("1.4.0")

defsetTol(value: Double):this.type= set(tol,value)

setDefault(tol ->1E-6)

setRegParam() 设置正则化项系数,正则化项主要用于防止过拟合现象,当数据集比较小,特征维数又比较多的时候,易出现过拟合,此时可以考虑增大正则化项系数 默认0.0

@Since("1.3.0")

defsetRegParam(value:Double):this.type= set(regParam,value)

setDefault(regParam->0.0)

setElasticNetParam() 正则化范式比(默认0.0),正则化一般有两种范式:L1和L2正则。L1一般用于特征的稀疏化,L2一般用于防止过拟合。这里的参数即设置L1范式的占比,默认0.0即只使用L2范式

@Since("1.4.0")

defsetElasticNetParam(value:Double):this.type= set(elasticNetParam,value)

setDefault(elasticNetParam->0.0)

setFeaturesCol() 设置特征列 默认是“features”

setLabelCol() 设置标签列 默认是“label”

setWeightCol() 设置权重,这里是设个各个参数的权重比,默认都为1。

@Since("1.6.0")

defsetWeightCol(value:String):this.type= set(weightCol,value)

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180415G17DN300?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

同媒体快讯

扫码关注云+社区

领取腾讯云代金券