前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【技术分享】保序回归

【技术分享】保序回归

原创
作者头像
腾讯云TI平台
修改2020-02-26 15:15:57
1.8K0
修改2020-02-26 15:15:57
举报
文章被收录于专栏:腾讯云TI平台腾讯云TI平台

本文原作者:尹迪,经授权后发布。

1 保序回归

  保序回归解决了下面的问题:给定包含n个数据点的序列 y_1,y_2,...,y_n , 怎样通过一个单调的序列 beta_1,beta_2,...,beta_n 来归纳这个问题。形式上,这个问题就是为了找到

1.1
1.1

  大部分时候,我们会在括号前加上权重w_i。解决这个问题的一个方法就是 pool adjacent violators algorithm(PAVA) 算法。粗略的讲,PAVA算法的过程描述如下:

  我们从左边的y_1开始,右移y_1直到我们遇到第一个违例(violation)即y_i < y_i+1,然后,我们用违例之前的所有y的平方替换这些y,以满足单调性。我们继续这个过程,直到我们最后到达y_n

2 近似保序

  给定一个序列y_1,y_2,...,y_n,我们寻找一个近似单调估计,考虑下面的问题

1.2
1.2

  在上式中,X_+表示正数部分,即X_+ = X.1 (x>0)。这是一个凸优化问题,处罚项处罚违反单调性(即beta_i > beta_i+1)的邻近对。

  在公式(2)中,隐含着一个假设,即使用等距的网格测量数据点。如果情况不是这样,那么可以修改惩罚项为下面的形式

1.3
1.3

x_i表示y_i测量得到的值。

3 近似保序算法流程

  这个算法是标准PAVA算法的修改版本,它并不从数据的左端开始,而是在需要时连接相邻的点,以产生近似保序最优的顺序。相比一下,PAVA对中间的序列并不特别感兴趣,只关心最后的序列。

  有下面一个引理成立。

1.4
1.4

  这个引理证明的事实极大地简化了近似保序解路径(solution path)的构造。假设在参数值为lambda的情况下,有K_lambda个连接块,我们用A_1,A_2,..,A_K_lambda表示。这样我们可以重写(2)为如下(3)的形式。

上面的公式,对beta求偏导,可以得到下面的次梯度公式。通过这个公式即可以求得beta

 为了符合方便,令s_0 = s_K_lambda = 0。并且,

  现在假设,当lambda在一个区间内增长时,组A_1,A_2,...,A_K_lambda不会改变。我们可以通过相应的lambda区分(4)。

1.8
1.8

  这个公式的值本身是一个常量,它意味着上式的betalambda的线性函数。

  随着lambda的增长,方程(5)将连续的给出解决方案的斜率直到组A_1,A_2,...,A_K_lambda改变。更加引理1,只有两个组合并时,这才会发生。m_i表示斜率,那么对于每一个i=1,...,K_lambda - 1A_iA_i+1合并之后得到的公式如下

  因此我们可以一直移动,直到lambda “下一个”值的到来

  并且合并A_i^starA_i^star+1,其中

  注意,可能有超过一对组别到达了这个最小值,在这种情况下,会组合所有满足条件的组别。公式(7)和(8)成立的条件是t_i,i+1大于lambda,如果没有t_i,i+1大于lambda,说明没有组别可以合并,算法将会终止。

算法的流程如下

  • 初始时,lambda=0K_lambda=n,A_i={i},i=1,2,...,n。对于每个i,解是beta_lambda,i = y_i
  • 重复下面过程

  **1、**通过公式(5)计算每个组的斜率m_i

  **2、**通过公式(6)计算没对相邻组的碰撞次数t_i,i+1

  **3、**如果t_i,i+1 < lambda,终止

  **4、**计算公式(7)中的临界点lambda^star,并根据斜率更新解

  对于每个i,根据公式(8)合并合适的组别(所以K_lambda^star = K_lambda - 1),并设置lambda = lambda^star

4 源码分析

  在1.6.x版本中,并没有实现近似保序回归,后续会实现。现在我们只介绍一般的保序回归算法实现。

4.1 实例

代码语言:javascript
复制
import org.apache.spark.mllib.regression.{IsotonicRegression, IsotonicRegressionModel}
val data = sc.textFile("data/mllib/sample_isotonic_regression_data.txt")
// 创建(label, feature, weight) tuples ,权重默认设置为1.0
val parsedData = data.map { line =>
  val parts = line.split(',').map(_.toDouble)
  (parts(0), parts(1), 1.0)
}
// Split data into training (60%) and test (40%) sets.
val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0)
val test = splits(1)
// Create isotonic regression model from training data.
// Isotonic parameter defaults to true so it is only shown for demonstration
val model = new IsotonicRegression().setIsotonic(true).run(training)
// Create tuples of predicted and real labels.
val predictionAndLabel = test.map { point =>
  val predictedLabel = model.predict(point._2)
  (predictedLabel, point._1)
}
// Calculate mean squared error between predicted and real labels.
val meanSquaredError = predictionAndLabel.map { case (p, l) => math.pow((p - l), 2) }.mean()
println("Mean Squared Error = " + meanSquaredError)

4.2 训练过程分析

parallelPoolAdjacentViolators方法用于实现保序回归的训练。parallelPoolAdjacentViolators方法的代码如下:

代码语言:javascript
复制
private def parallelPoolAdjacentViolators(
      input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
    val parallelStepResult = input
      //以(feature,label)为key进行排序
      .sortBy(x => (x._2, x._1))
      .glom()//合并不同分区的数据为一个数组
      .flatMap(poolAdjacentViolators)
      .collect()
      .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
    poolAdjacentViolators(parallelStepResult)
  }

parallelPoolAdjacentViolators方法的主要实现是poolAdjacentViolators方法,该方法主要的实现过程如下:

代码语言:javascript
复制
var i = 0
val len = input.length
while (i < len) {
     var j = i
     //找到破坏单调性的元祖的index
     while (j < len - 1 && input(j)._1 > input(j + 1)._1) {
       j = j + 1
     }
     // 如果没有找到违规点,移动到下一个数据点
     if (i == j) {
       i = i + 1
     } else {
       // 否则用pool方法处理违规的节点
       // 并且检查pool之后,之前处理过的节点是否违反了单调性约束
       while (i >= 0 && input(i)._1 > input(i + 1)._1) {
          pool(input, i, j)
          i = i - 1
       }
       i = j
     }
}

pool方法的实现如下所示。

代码语言:javascript
复制
def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
      //取得i到j之间的元组组成的子序列
      val poolSubArray = input.slice(start, end + 1)
      //求子序列sum(label * w)之和
      val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
      //求权重之和
      val weight = poolSubArray.map(_._3).sum
      var i = start
      //子区间的所有元组标签相同,即拥有相同的预测
      while (i <= end) {
        //修改标签值为两者之商
        input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
        i = i + 1
      }
}

  经过上文的处理之后,input根据中的labelfeature均是按升序排列。对于拥有相同预测的点,我们只保留两个特征边界点。

代码语言:javascript
复制
val compressed = ArrayBuffer.empty[(Double, Double, Double)]
var (curLabel, curFeature, curWeight) = input.head
var rightBound = curFeature
def merge(): Unit = {
    compressed += ((curLabel, curFeature, curWeight))
    if (rightBound > curFeature) {
        compressed += ((curLabel, rightBound, 0.0))
    }
}
i = 1
while (i < input.length) {
    val (label, feature, weight) = input(i)
    if (label == curLabel) {
       //权重叠加
       curWeight += weight
       rightBound = feature
    } else {//如果标签不同,合并
       merge()
       curLabel = label
       curFeature = feature
       curWeight = weight
       rightBound = curFeature
    }
    i += 1
}
merge()

  最后将训练的结果保存为模型。

代码语言:javascript
复制
//标签集
val predictions = if (isotonic) pooled.map(_._1) else pooled.map(-_._1)
//特征集
val boundaries = pooled.map(_._2)
new IsotonicRegressionModel(boundaries, predictions, isotonic)

4.3 预测过程分析

代码语言:javascript
复制
def predict(testData: Double): Double = {
    def linearInterpolation(x1: Double, y1: Double, x2: Double, y2: Double, x: Double): Double = {
      y1 + (y2 - y1) * (x - x1) / (x2 - x1)
    }
    //二分查找index
    val foundIndex = binarySearch(boundaries, testData)
    val insertIndex = -foundIndex - 1
    // Find if the index was lower than all values,
    // higher than all values, in between two values or exact match.
    if (insertIndex == 0) {
      predictions.head
    } else if (insertIndex == boundaries.length){
      predictions.last
    } else if (foundIndex < 0) {
      linearInterpolation(
        boundaries(insertIndex - 1),
        predictions(insertIndex - 1),
        boundaries(insertIndex),
        predictions(insertIndex),
        testData)
    } else {
      predictions(foundIndex)
    }
  }

  当测试数据精确匹配一个边界,那么返回相应的特征。如果测试数据比所有边界都大或者小,那么分别返回第一个和最后一个特征。当测试数据位于两个边界之间,使用linearInterpolation方法计算特征。 这个方法是线性内插法。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 保序回归
  • 2 近似保序
  • 3 近似保序算法流程
  • 4 源码分析
    • 4.1 实例
      • 4.2 训练过程分析
        • 4.3 预测过程分析
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档