前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark系列(五)共享变量累加器

Spark系列(五)共享变量累加器

作者头像
张凝可
发布2019-08-22 10:52:26
4930
发布2019-08-22 10:52:26
举报
文章被收录于专栏:技术圈技术圈技术圈

累加器(accumulator)

累加器是共享变量的一种,它提供了信息聚合的一种方法,提供了将工作节点中的值聚合到驱动器程序中的简单语法,累加器常常被作为Rdd的map,filter操作的副产品,这仍然是由于行动操作之前的转化操作仍然是惰性的,只有进行了行动操作之后,才会触发累加器的求值操作。

spark快速大数据分析

只有在执行了saveAsTextFile之后,累加器blankLines才能保存正确的值。

累加器的使用方法如下:

• 通 过 在 驱动器中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始 值 的 累 加 器。 返 回 值 为 org.apache.spark.Accumulator[T] 对 象, 其 中 T 是 初 始 值initialValue 的类型。

• Spark 闭包里的执行器代码可以使用累加器的 += 方法(在 Java 中是 add )增加累加器的值。

• 驱动器程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue() )来访问累加器的值。

累加器的注意点

分布式平台的下进行聚合的共享变量难免存在多加或者是少加的情况,Spark 会自动重新执行失败的或较慢的任务来应对有错误的或者比较慢的机器。因此最终结果就是同一个函数可能对同一个数据运行了多次,如果累加器的累加操作在转化操作,那么可能就出现了不止一次的更新,出现了多加的情况。但是如果累加器的累加操作在行动操作,Spark只会把每个任务对各累加器的修改应用一次。

自定义累加器

Spark 直接支持Int, Double 、 Long 和 Float 型的累加器,如果是其他类型的累加器,需要自定义,重新重写初始值和add方法。下面给出Accumulator[Seq[Int]]的代码实现

class SeqAccumulatorParam[B] extends AccumulatorParam[Seq[B]] {
    override def zero(initialValue: Seq[B]): Seq[B] = Seq[B]()
    override def addInPlace(s1: Seq[B], s2: Seq[B]): Seq[B] = s1 ++ s2
} 
val seqAccum = sc.accumulator(Seq[Int]())(new SeqAccumulatorParam[Int]())

这里重新了zero和addInPlace两种方法,定义为累加器对Seq的累加为Seq的拼接操作。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年03月23日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档