累加器(accumulator)
累加器是共享变量的一种,它提供了信息聚合的一种方法,提供了将工作节点中的值聚合到驱动器程序中的简单语法,累加器常常被作为Rdd的map,filter操作的副产品,这仍然是由于行动操作之前的转化操作仍然是惰性的,只有进行了行动操作之后,才会触发累加器的求值操作。
spark快速大数据分析
只有在执行了saveAsTextFile之后,累加器blankLines才能保存正确的值。
累加器的使用方法如下:
• 通 过 在 驱动器中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始 值 的 累 加 器。 返 回 值 为 org.apache.spark.Accumulator[T] 对 象, 其 中 T 是 初 始 值initialValue 的类型。
• Spark 闭包里的执行器代码可以使用累加器的 += 方法(在 Java 中是 add )增加累加器的值。
• 驱动器程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue() )来访问累加器的值。
累加器的注意点
分布式平台的下进行聚合的共享变量难免存在多加或者是少加的情况,Spark 会自动重新执行失败的或较慢的任务来应对有错误的或者比较慢的机器。因此最终结果就是同一个函数可能对同一个数据运行了多次,如果累加器的累加操作在转化操作,那么可能就出现了不止一次的更新,出现了多加的情况。但是如果累加器的累加操作在行动操作,Spark只会把每个任务对各累加器的修改应用一次。
自定义累加器
Spark 直接支持Int, Double 、 Long 和 Float 型的累加器,如果是其他类型的累加器,需要自定义,重新重写初始值和add方法。下面给出Accumulator[Seq[Int]]的代码实现
class SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {
override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()
override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2
}
val seqAccum = sc.accumulator(Seq[Int]())(new SeqAccumulatorParam[Int]())
这里重新了zero和addInPlace两种方法,定义为累加器对Seq的累加为Seq的拼接操作。