首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【技术分享】流式k-means算法

【技术分享】流式k-means算法

原创
作者头像
腾讯云TI平台
修改2019-07-24 15:21:04
2K0
修改2019-07-24 15:21:04
举报
文章被收录于专栏:腾讯云TI平台腾讯云TI平台

本文原作者:尹迪,经授权发布。

| 导语 当数据是以流的方式到达的时候,我们可能想动态的估计(estimate )聚类的簇,通过新的到达的数据来更新聚类。`spark.mllib`支持流式`k-means`聚类,并且可以通过参数控制估计衰减(decay)(或“健忘”(forgetfulness))。这个算法使用一般地小批量更新规则来更新簇。

1 流式k-means算法原理

  对每批新到的数据,我们首先将点分配给距离它们最近的簇,然后计算新的数据中心,最后更新每一个簇。使用的公式如下所示:

  在上面的公式中,ctct表示前一个簇中心,ntnt表示分配给这个簇的点的数量, xtxt表示从当前批数据的簇中心,mtmt表示当前批数据的点数量。 当评价新的数据时,把衰减因子alpha当做折扣加权应用到当前的点上,用以衡量当前预测的簇的贡献度量。当alpha等于1时,所有的批数据赋予相同的权重,当alpha等于0时,数据中心点完全通过当前数据确定。

  衰减因子alpha也可以通过halfLife参数联合时间单元(time unit)来确定,时间单元可以是一批数据也可以是一个数据点。假如数据从t时刻到来并定义了halfLifeh, 在t+h时刻,应用到t时刻的数据的折扣(discount)为0.5。

  流式k-means算法的步骤如下所示:

  • (1)分配新的数据点到离其最近的簇;
  • (2)根据时间单元(time unit)计算折扣(discount)值,并更新簇权重;
  • (3)应用更新规则;
  • (4)应用更新规则后,有些簇可能消失了,那么切分最大的簇为两个簇。

2 流式k-means算法源码分析

  在分步骤分析源码之前,我们先了解一下StreamingKMeans参数表达的含义。

class StreamingKMeans(
    var k: Int, //簇个数
    var decayFactor: Double,//衰减因子
    var timeUnit: String //时间单元
)

  在上述定义中,k表示我们要聚类的个数,decayFactor表示衰减因子,用于计算折扣,timeUnit表示时间单元,时间单元既可以是一批数据(StreamingKMeans.BATCHES)也可以是单条数据(StreamingKMeans.POINTS)。

  由于我们处理的是流式数据,所以我们在流式数据来之前要先初始化模型。有两种初始化模型的方法,一种是直接指定初始化中心点及簇权重,一种是随机初始化中心点以及簇权重。

 //直接初始化中心点及簇权重
 def setInitialCenters(centers: Array[Vector], weights: Array[Double]): this.type = {
    model = new StreamingKMeansModel(centers, weights)
    this
 }
 //随机初始化中心点以及簇权重
 def setRandomCenters(dim: Int, weight: Double, seed: Long = Utils.random.nextLong): this.type = {
     val random = new XORShiftRandom(seed)
     val centers = Array.fill(k)(Vectors.dense(Array.fill(dim)(random.nextGaussian())))
     val weights = Array.fill(k)(weight)
     model = new StreamingKMeansModel(centers, weights)
     this
 }

  初始化中心点以及簇权重之后,对于新到的流数据,我们使用更新规则修改中心点和权重,调整聚类情况。更新过程在update方法中实现,下面我们分步骤分析该方法。

  • (1)分配新到的数据到离其最近的簇,并计算更新后的簇的向量和以及点数量
 //选择离数据点最近的簇
 val closest = data.map(point => (this.predict(point), (point, 1L)))
 def predict(point: Vector): Int = {
     //返回和给定点相隔最近的中心
     KMeans.findClosest(clusterCentersWithNorm, new VectorWithNorm(point))._1
 }
 // 获得更新的簇的向量和以及点数量
 val mergeContribs: ((Vector, Long), (Vector, Long)) => (Vector, Long) = (p1, p2) => {
   // y += a * x,向量相加
   BLAS.axpy(1.0, p2._1, p1._1)
   (p1._1, p1._2 + p2._2)
 }
 val pointStats: Array[(Int, (Vector, Long))] = closest
    .aggregateByKey((Vectors.zeros(dim), 0L))(mergeContribs, mergeContribs)
    .collect()
  • (2)获取折扣值,并用折扣值作用到权重上
 // 折扣
 val discount = timeUnit match {
    case StreamingKMeans.BATCHES => decayFactor
    case StreamingKMeans.POINTS =>
      //所有新增点的数量和
      val numNewPoints = pointStats.view.map { case (_, (_, n)) =>
          n
      }.sum
    // x^y
    math.pow(decayFactor, numNewPoints)
 }
 //将折扣应用到权重上
 //x = a * x
 BLAS.scal(discount, Vectors.dense(clusterWeights))

  上面的代码更加时间单元的不同获得不同的折扣值。当时间单元为StreamingKMeans.BATCHES时,折扣就为衰减因子;当时间单元为StreamingKMeans.POINTS时,折扣由新增数据点的个数n和衰减因子decay共同决定。 折扣值为ndecay相乘。

  • (3)实现更新规则
// 实现更新规则
pointStats.foreach { case (label, (sum, count)) =>
   //获取中心点
   val centroid = clusterCenters(label)
   //更新权重
   val updatedWeight = clusterWeights(label) + count
   val lambda = count / math.max(updatedWeight, 1e-16)
   clusterWeights(label) = updatedWeight
   //x = a * x,即(1-lambda)*centroid
   BLAS.scal(1.0 - lambda, centroid)
   // y += a * x,即centroid +=sum*lambda/count
   BLAS.axpy(lambda / count, sum, centroid)
}

  上面的代码对每一个簇,首先更新簇的权重,权重值为原有的权重加上新增数据点的个数。然后计算lambda,通过lambda更新中心点。lambda为新增数据的个数和更新权重的商。 假设更新之前的中心点为c1,更新之后的中心点为c2,那么c2=(1-lambda)*c1+sum/count,其中sum/count为所有点的平均值。

  • (4)调整权重最小和最大的簇
 val weightsWithIndex = clusterWeights.view.zipWithIndex
 //获取权重值最大的簇
 val (maxWeight, largest) = weightsWithIndex.maxBy(_._1)
 //获取权重值最小的簇
 val (minWeight, smallest) = weightsWithIndex.minBy(_._1)
 //判断权重最小的簇是否过小,如果过小,就将这两个簇重新划分为两个新的簇,权重为两者的均值
 if (minWeight < 1e-8 * maxWeight) {
      logInfo(s"Cluster $smallest is dying. Split the largest cluster $largest into two.")
      val weight = (maxWeight + minWeight) / 2.0
      clusterWeights(largest) = weight
      clusterWeights(smallest) = weight
      val largestClusterCenter = clusterCenters(largest)
      val smallestClusterCenter = clusterCenters(smallest)
      var j = 0
      while (j < dim) {
        val x = largestClusterCenter(j)
        val p = 1e-14 * math.max(math.abs(x), 1.0)
        largestClusterCenter.toBreeze(j) = x + p
        smallestClusterCenter.toBreeze(j) = x - p
        j += 1
      }
    }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 流式k-means算法原理
  • 2 流式k-means算法源码分析
相关产品与服务
腾讯云 TI 平台
腾讯云 TI 平台(TencentCloud TI Platform)是基于腾讯先进 AI 能力和多年技术经验,面向开发者、政企提供的全栈式人工智能开发服务平台,致力于打通包含从数据获取、数据处理、算法构建、模型训练、模型评估、模型部署、到 AI 应用开发的产业 + AI 落地全流程链路,帮助用户快速创建和部署 AI 应用,管理全周期 AI 解决方案,从而助力政企单位加速数字化转型并促进 AI 行业生态共建。腾讯云 TI 平台系列产品支持公有云访问、私有化部署以及专属云部署。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档