spark源码系列之累加器实现机制及自定义累加器

一,基本概念

累加器是Spark的一种变量,顾名思义该变量只能增加。有以下特点:

1,累加器只能在Driver端构建及并只能是Driver读取结果,Task只能累加。

2,累加器不会改变Spark Lazy计算的特点。只会在Job触发的时候进行相关累加操作。

3,现有累加器的类型。

二,累加器的使用

Driver端初始化,并在Action之后获取值。

val accum = sc.accumulator(0, "test Accumulator") accum.value

Executor端进行计算

accum+=1;

三,累加器的重点类

Class Accumulator extends Accumulable

主要是实现了累加器的初始化及封装了相关的累加器操作方法。同时在类对象构建的时候向我们的Accumulators注册了累加器。累加器的add操作的返回值类型和我们传入的值类型可以不一样。所以,我们一定要定义好如何累加和合并值。也即add方法

object Accumulators:

该方法在Driver端管理着我们的累加器,也包含了特定累加器的聚合操作。

trait AccumulatorParam[T] extends AccumulableParam[T, T]:

AccumulatorParam的addAccumulator操作的泛型封装,具体的实现还是要再具体实现类里面实现addInPlace方法。

object AccumulatorParam:

主要是进行隐式类型转换的操作。

TaskContextImpl:

在Executor端管理着我们的累加器。

四,累加器的源码解析

1,Driver端的初始化

val accum = sc.accumulator(0, "test Accumulator")

val acc = new Accumulator(initialValue, param, Some(name))

主要是在Accumulable(Accumulator)中调用了,这样我们就可以使用Accumulator使用了。

Accumulators.register(this)

2,Executor端的反序列化得到我们对象的过程

首先,我们的value_ 可以看到其并不支持序列化

@volatile @transient private var value_ : R = initialValue // Current value on master

其初始化是在我们反序列化的时候做的,反序列化还完成了Accumulator向我们的TaskContextImpl的注册

反序列化是在调用ResultTask的RunTask方法的时候做的

val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

过程中会调用

private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() value_ = zero deserialized = true // Automatically register the accumulator when it is deserialized with the task closure. // // Note internal accumulators sent with task are deserialized before the TaskContext is created // and are registered in the TaskContext constructor. Other internal accumulators, such SQL // metrics, still need to register here. val taskContext = TaskContext.get() if (taskContext != null) { taskContext.registerAccumulator(this) } }

3,累加器的累加

accum+=1;

param.addAccumulator(value_, term)

根据不同的累加器参数有不同的实现AccumulableParam

如,int类型。最终调用的AccumulatorParam特质的addAccumulator方法。

trait AccumulatorParam[T] extends AccumulableParam[T, T] { def addAccumulator(t1: T, t2: T): T = { addInPlace(t1, t2) } }

然后,调用的是各个具体实现的addInPlace方法

implicit object IntAccumulatorParam extends AccumulatorParam[Int] { def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def zero(initialValue: Int): Int = 0 }

返回后更新了我们的Accumulators的value_的值。

4,Accumulator的各个节点累加的之后的聚合操作

在Task类的run方法里面得到并返回的

(runTask(context), context.collectAccumulators())

最终在DAGScheduler里面调用了updateAccumulators(event)

在updateAccumulators方法中

Accumulators.add(event.accumUpdates)

具体内容如下:

def add(values: Map[Long, Any]): Unit = synchronized { for ((id, value) <- values) { if (originals.contains(id)) { // Since we are now storing weak references, we must check whether the underlying data // is valid. originals(id).get match { case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value case None => throw new IllegalAccessError("Attempted to access garbage collected Accumulator.") } } else { logWarning(s"Ignoring accumulator update for unknown accumulator id $id") } } }

5,最后我们就可以获取到累加器的值了

accum.value

五,累加器使用注意事项

累加器不会改变我们RDD的Lazy的特性,之后再Action之后完成计算和更新。

但是假如出现两个Action公用一个转化操作,如map,在map里面进行累加器累加,那么每次action都会累加,造成某些我们不需要的结果。

六,自定义累加器

自定义累加器输出

七,总结

主要牵涉点就是序列化及类加载执行,这是深入玩spark的必须,欢迎大家公众号打赏领取相关课程。

视频内容

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-06-04

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java编程技术

并发队列-无界阻塞延迟队列DelayQueue原理探究

DelayQueue队列中每个元素都有个过期时间,并且队列是个优先级队列,当从队列获取元素时候,只有过期元素才会出队列。

842
来自专栏大内老A

ASP.NET MVC Controller激活系统详解:IoC的应用[下篇]

[上篇]除了通过自定义ControllerFactory的方式引入IoC之外,在使用默认DefaultControllerFactory情况下也可以通过一些扩展...

17610
来自专栏java架构师

Stream篇(1)

最近在看一个写的很好的博客,为了加深记忆,把自认为重要的东西,一边看,一边记在这里 一、什么是流、字节序列、字节 一条河中有一条鱼游过,这条鱼就是一个字节,这个...

2837
来自专栏大内老A

晚绑定场景下对象属性赋值和取值可以不需要PropertyInfo

在《一句代码实现批量数据绑定》中,我通过界面控件ID与作为数据源的实体属性名之间的映射实现了批量数据绑定。由于里面频繁涉及对属性的反射——通过反射从实体对象中获...

23810
来自专栏小樱的经验随笔

POJ 3278 Catch That Cow(BFS,板子题)

Catch That Cow Time Limit: 2000MS Memory Limit: 65536K Total Submissions...

2695
来自专栏菩提树下的杨过

objective-C 的OOP(上)--类定义、继承及方法调用

上一篇展示了如何用传统的“面向过程编程方法”,实现画“矩形”、“圆”、“椭圆”,这一篇看下如何改用OOP的方法来实现: ? 因为要用到“颜色”以及“矩形区域”二...

1568
来自专栏好好学java的技术栈

高并发之并发容器,了解多少(从入门到超神)

在上面已经提到过ConcurrentHashMap,ConcurrentHashMap相比Hashtable能够进一步提高并发性,其原理图如下:

773
来自专栏技术小黑屋

深入剖析 Android中的 ArrayMap

数据集合在任何一门编程语言中都是很重要的一部分,在 Android 开发中,我们会实用到ArrayList, LinkedList, HashMap等。其中Ha...

653
来自专栏Java与Android技术栈

Java8新的异步编程方式 CompletableFuture(二)

上一篇文章,讲述了Future模式的机制、缺点,CompletableFuture产生的由来、静态工厂方法、complete()方法等等。

1021
来自专栏北京马哥教育

深度详解 Python yield与实现

学Python最简单的方法是什么?推荐阅读:Python开发工程师成长魔法 Python yield与实现 yield的功能类似于return,但是不同之处在于...

43212

扫码关注云+社区