首页
学习
活动
专区
工具
TVP
发布

Spark Mllib源码分析之线性回归

线性回归简介

线性回归是利用数理统计中回归分析,来确定两种或两种以上变量间相互依赖的定量关系的一种统计分析方法,运用十分广泛。其表达形式如下:

Hθ(x) = θ+ θ1x1+ θ2x2+ … + θnxn

从线性代数角度,上面的公司可以看做两个向量相乘,即

Hθ(x) = θTX

机器学习算法要训练的,就是参数集θT

建立模型后,我们需要给模型一个优化目标,使得训练后得到的参数能够让预测值尽可能地接近真实值Y。这里我们引入损失函数(Loss Function,或Cost Function)这个概念。损失函数输出一个非负的实值。这个实值通常用来反映模型误差的大小。线性回归的损失函数通常用1/2*(θTX-Y)2来表示。其意义是当前模型的估计值与真实值之差的平方和,乘上1/2 是为了求导的时候保证系数为1。

有了损失函数,求θT的问题,就演变成了求1/2*(θTX-Y)2的极小值的问题,spark中使用的是梯度下降法,梯度的计算方法是X*(θTX-Y),即损失函数对θT求偏导。

代码分析

Spark 用如下方式训练线性回归模型:

LinearRegressionWithSGD.train(Data, iterations,stepSize),这个函数位于LinearRegressionWithSGD的伴生对象中,因此是一个可直接调用的方法。其中, Data是与源数据相关的RDD,iterations 表示迭代的次数,stepSize表示初始的迭代步长,返回的结果是一个线性回归模型LinearRegressionModel。

Train函数内部,会创建一个LinearRegressionWithSGD对象,并调用它的父类GeneralizedLinearAlgorithm的run方法。

Run方法中,最重要的部分是 optimizer.optimize(data,initialWeightsWithIntercept), 通过反复迭代得到最终参数集θT的运算就是在此处执行。这个optimizer的实现类是GradientDescent (梯度下降),是在创建LinearRegressionWithSGD对象的时候被指定GradientDescent 的optimize函数调用了 GradientDescent的runMiniBatchSGD函数,线性回归模型训练最重要的流程都在runMiniBatchSGD函数的这个循环里,其中numIterations就是循环迭代的次数

while(!converged && i

……

}

我仔细看一下这个循环体里的内容:

把当前的权重作为广播变量广播到各个worker上,作为gradient.compute函数的输入:

对数据做采样(因为miniBatchFraction默认是1.0,所以实际上得到的是全体数据),然后对采样的数据,调用treeAggregate方法,在每个worker上,做分布式计算,得到最终把计算得到的梯度之和(gradientSum),损失之和(lossSum),处理的数据的条数(miniBatchSize)返回到driver端:

val(gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction,42 + i)

.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(

seqOp = (c, v) => {

// c: (grad, loss, count), v: (label, features)

val l = gradient.compute(v._2, v._1, bcWeights.value,Vectors.fromBreeze(c._1))

(c._1, c._2 + l, c._3 + 1)

},

combOp = (c1, c2) => {

// c: (grad, loss, count)

(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)

})

在treeAggregate里,需要自己实现两个spark预先定义好的函数:seqOp在excutor端执行,combOp在driver端执行(实验证明,如果数据少,只有一个excutor在运算的话,combOp不会执行,因为不需要聚合)。

来看看在excutor端执行的最重要的函数seqOp的实现

(c, v) => {

// c: (grad, loss, count), v: (label, features)

val l = gradient.compute(v._2,v._1, bcWeights.value, Vectors.fromBreeze(c._1))

(c._1, c._2 + l, c._3 + 1)

}

当前数据v(v由2部分组成,v._1对应Y值,v._2对应向量X,由输入的源数据解析得到), 当前梯度的累加值(c._1)和广播过来的权重bcWeights,以上四项作为gradient.compute的参数,计算出当前数据的损失函数的值l (计算公式是1/2*(θTX-Y)2),在这个函数调用的过程中,梯度的累加值会被更新。计算出l后,要与之前的损失值之和相加,得到新的损失值之和。

在来看看gradient.compute函数。这里gradient的实现类是LeastSquaresGradient,同样是在创建LinearRegressionWithSGD对象的时候指定。

override defcompute(

data: Vector, label: Double,

weights:Vector, cumGradient: Vector): Double = {

// 把当前行数据的X和权重θT作向量乘法,再减去当前行数据的Y,得到两者值的差,即diff的值是θTX-Y

val diff =dot(data, weights) - label

// 用diff值和data值相乘,即X*(θTX-Y)

// 然后再把得到的梯度和之前梯度之和相加,得到新的梯度之和axpy(diff, data, cumGradient)

// 返回损失函数1/2*(θTX-Y)2

diff * diff / 2.0

}

}

对于抽样出的所有数据,都会被依次调用gradient.compute 函数,并把累加的损失值lossSum,累加的梯度gradientSum,处理数据的条数miniBatchSize 作为计算结果返回给driver端。当driver端得到数据后,线性回归算法会根据这几个数来做一系列的更新操作,包括更新当前权重(最重要),更新历史损失量数组stochasticLossHistory,更新前一次权重previousWeights,更新当前权重currentWeights等。

if(miniBatchSize > 0) {

stochasticLossHistory.append(lossSum / miniBatchSize + regVal)

// 重要的操作,后面会详细说明

val update = updater.compute(

weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),

stepSize, i, regParam)

weights = update._1

regVal = update._2

// 更新previousWeights和currentWeights

previousWeights = currentWeights

currentWeights = Some(weights)

if (previousWeights != None && currentWeights != None) {

converged = isConverged(previousWeights.get,

currentWeights.get, convergenceTol)

}

} else {

logWarning(s"Iteration ($i/$numIterations). The size of sampledbatch is zero")

}

i += 1 // 更新循环次数,开始下一次循环,直到它大于numIterations

上面代码片段中最重要的更新操作,是调用updater.compute 对当前的权重进行更新。更新后的权重,会在下次循环中,通过前面提到的广播变量的方式,发送到excutor端,供treeAggregate 里的 gradient.compute 函数使用。updater的实现类是SimpleUpdater,同样是在创建LinearRegressionWithSGD对象的时候指定的。

我们来详细看一下SimpleUpdater的compute函数:

override def compute(

weightsOld: Vector,gradient: Vector,

stepSize: Double,iter: Int,

regParam: Double): (Vector, Double) = {

// 计算权重更新的步长

val thisIterStepSize = stepSize / math.sqrt(iter)

val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector

brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)

(Vectors.fromBreeze(brzWeights), 0)

}

这里实现的就是权重更新的公式:

其中W是当前的权重, gradient的值是 gradientSum 除以miniBatchSize 得到值。iter就是当前循环的次数,因为stepSize的值是固定的,所以在循环的初期,W更新得会比较快,二随着循环次数的增加,W的更新速度会变慢。

以上就是runMiniBatchSGD中while循环内部的逻辑。循环结束后,runMiniBatchSGD 返回迭代后最终得到的 weights,weights会在GeneralizedLinearAlgorithm的run函数中被使用,作为createModel 函数的参数:

createModel(weights,intercept)

这个函数返回一个LinearRegressionModel,作为模型训练的结果

以上就是spark从原始数据训练出线性回归模型的整个步骤。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180126B0U8YV00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券