纸上得来终觉浅,绝知此事要躬行!
Spark 计算框架为了能够进行高并发和高吞吐的数据处理,封装了三大数据结构,用于
处理不同的应用场景。三大数据结构分别是:
RDD : 弹性分布式数据集
累加器:分布式共享只写变量
广播变量:分布式共享只读变量
一、累加器
1 实现原理
累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,
在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。
获取系统默认累加器
Spark默认就提供了简单数据聚合的累加器
//系统累加器
val sumAcc: LongAccumulator = sc.longAccumulator("sum")
rdd.foreach(
//累加器使用
num => sumAcc.add(num)
)
//获取累加器的值
println(sumAcc.value)
累加器常见问题:
少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行
一般情况下,累加器会放置在行动算子进行操做
// 自定义累加器
// 1. 继承 AccumulatorV2,并设定泛型
IN:累加器输入的数据类型
OUT:累加器返回的数据类型
// 2. 重写累加器的抽象方法
class WordCountAccumulator extends AccumulatorV2[IN,OUT]
二、广播变量
1 实现原理
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个
或多个 Spark 操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,
广播变量用起来都很顺手。在多个并行操作中使用同一个变量,但是 Spark 会为每个任务
分别发送。
//RDD join会导致数据量几何增长,并且会影啊shuffle的性能,不推荐使用
闭包数据,都是以Task为单位发送的,每个任务中包含闭包数据
这样可能会导致,一个Executor中含有大量重复的数据,并且占用大量的内存
Executor其实就一个JVM,所以在启动时,会自动分配内存
Spark中的广播变量就可以将闭包的数据保存到Executor的内存中,Spark中的广播变量不能够更改:分布式共享只读变量
val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)
val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )
// 声明广播变量
val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)
val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {
case (key, num) => {
var num2 = 0
// 使用广播变量
for ((k, v)
if (k == key) { num2 = v } }
(key, (num, num2)) } }
领取专属 10元无门槛券
私享最新 技术干货