通常,当传递给Spark操作(例如map or reduce)的函数在远程集群节点上执行时,它可以在函数中使用的所有变量的单独副本上工作。这些变量被复制到每个机器,并且远程机器上的变量的更新都不会被传播回到驱动程序。在任务之间支持一般的,读写共享变量将是低效的。然而,Spark 为两种常用的使用模式提供了两种有限类型的共享变量:广播变量和累加器。
累加器是一种只能通过关联操作进行“加(add)”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现counters和sums。Spark原生支持数值类型的累加器,开发者可以自己添加支持的类型,在2.0.0之前的版本中,通过继承AccumulatorParam来实现,而2.0.0之后的版本需要继承AccumulatorV2来实现自定义类型的累加器。工作节点上的任务不能够访问累加器的值。从这些任务的角度来看,累计器只是一个只写变量。在这种模式下,累加器的实现可以更加高效,不需要对每次更新操作进行复杂的通信。通俗的说就是:累加器可以看成是一个集群规模级别的一个大变量
a:Spark内置的提供了Long和Double类型的累加器。
LongAccumulator longAccumulator = jsc.sc().longAccumulator();
DoubleAccumulator doubleAccumulator = jsc.sc().doubleAccumulator();
b:主要方法介绍
add方法:赋值操作
value方法:获取累加器中的值
merge方法:该方法特别重要,一定要写对,这个方法是各个task的累加器进行合并的方法(下面介绍执行流程中将要用到)
iszero方法:判断是否为初始值
reset方法:重置累加器中的值
copy方法:拷贝累加器
c:spark中累加器的执行流程
首先有几个task,spark engine就调用copy方法拷贝几个累加器(不注册的),然后在各个task中进行累加(注意在此过程中,被最初注册的累加器的值是不变的),执行最后将调用merge方法和各个task的结果累计器进行合并(此时被注册的累加器是初始值)
d:使用累加器需要注意的点()
1:只有在行动操作中才会触发累加器,也就是说如:flatMap()转换操作因为Spark惰性特征所以只用当执行行动操作(如:count等)时累加器才会被触发;累加器只有在驱动程序中才可访问,worker节点中的任务不可访问累加器中的值.
2:使用Accumulator时,为了保证准确性,只使用一次action操作。如果需要使用多次则使用cache或persist操作切断依赖
1:简单实用demo
LongAccumulator accum = jsc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
accum.value();// returns 10
2:自定义一个累加器(2.0后提供了AccumulatorV2)看代码
MyAccumulatorV2是自己定义的一个累加器类型,里面是Set
public class MyAccumulatorV2 extends AccumulatorV2<String, Set> {
private Set set=new HashSet<>();
@Override
public boolean isZero() {
return set.isEmpty();
}
@Override
public AccumulatorV2<String, Set> copy() {
MyAccumulatorV2 myAccumulatorV2=new MyAccumulatorV2();
synchronized(myAccumulatorV2){
myAccumulatorV2.set.addAll(set);
}
return myAccumulatorV2;
}
@Override
public void reset() {
set.clear();
}
@Override
public void add(String s) {
set.add(s);
}
@Override
public void merge(AccumulatorV2<String, Set> accumulatorV2) {
set.addAll(accumulatorV2.value());
}
@Override
public Set value() {
return set;
}
}
public class MyAccumulatorV2Main {
private static JavaSparkContext jsc;
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("MyAccumulatorV2Main");
jsc = new JavaSparkContext(conf);
jsc.setLogLevel("ERROR");
MyAccumulatorV2 myAccumulatorV2=new MyAccumulatorV2();
jsc.sc().register(myAccumulatorV2, "myAccumulatorV2");
JavaRDD rdd = jsc.parallelize(Arrays.asList("A","B","C"), 5).cache();
rdd.foreach(x ->myAccumulatorV2.add(x)); //将字符串进行add操作
myAccumulatorV2.value();
MyAccumulatorV2 myAccumulatorV3= (MyAccumulatorV2) myAccumulatorV2.copy();//copy一个对象
myAccumulatorV3.value();
}
}
注:有人在网上提出过webui页面上看怎么看spark累加器,本人期初的时候也没找到怎么看,后来才知道,需要自定义累加器名称后才能看到jsc.sc().register(myAccumulatorV2, "myAccumulatorV2"); 后面引号里面的才是累加器名称,这样才能看。图
如果不是自定义累加器需要这样定义:
LongAccumulator accum =jsc.sc().longAccumulator("longAccumulatorname"); 这才能在页面上看到longAccumulatorname的信息。
Spark提供的广播变量可以解决闭包函数引用外部大变量引起的性能问题;广播变量将只读变量缓存在每个worker节点中,Spark使用了高效广播算法分发变量从而提高通信性能;如直接在闭包函数中使用外部 变量该变量会缓存在每个任务(jobTask)中如果多个任务同时使用了一个大变量势必会影响到程序性能;广播变量:每个worker节点中缓存一个副本,通过高效广播算法提高传输效率,广播变量是只读的;Spark Scala Api与Java Api默认使用了Jdk自带序列化库,通过使用第三方或使用自定义的序列化库还可以进一步提高广播变量的性能。看个demo
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("BroadcastDemo"); sc = new JavaSparkContext(conf); //Arrays中含义是 "性别编码,名字" 如"1,张三" JavaRDDlinesRDD = sc.parallelize(Arrays.asList("1,张三", "0,李梅", "3,王五")); MapsexMap = new HashMap(); sexMap.put("1", "男人"); sexMap.put("0", "女人"); //声明一个广播变量,值是sexMap, Broadcast<Map> sexMapBC = sc.broadcast(sexMap); JavaRDDretRDD = linesRDD.map(new Function() { @Override public String call(String line) throws Exception { String[] splits = line.split(","); String sid = splits[0].trim(); //获取广播变量中的值,如果找不到就是 “未知” String sName = sexMapBC.value().getOrDefault(sid, "未知"); return splits[1]+"是 " + sName + " " ; } }).cache(); retRDD.foreach(str -> System.out.println(str));
广播变量的优势:是因为不是每个task一份变量副本,而是变成每个节点的executor才一份副本。这样的话,就可以让变量产生的副本大大减少。
简单解释就是:上面demo定义了一个sexMapBC的广播变量,这个变量每台work上只存一份,然后该work上的所有task共享这个变量
如图
左变没有采用广播变量,右边采用了广播变量。 左每个task都有一个副本,右边只有worker上一个副本。
网上的一个例子
50个Executor 1000个task。 默认情况下,1000个task 1000个副本 1000 * 10M = 10 000M = 10 G 10G的数据,网络传输,在集群中,耗费10G的内存资源。 如果使用 广播变量, 50个Executor ,50个副本,10M*50 = 500M的数据。 网络传输,而且不一定是从Drver传输到各个节点,还可能是从就近的节点 的Executor的BlockManager上获取变量副本,网络传输速度大大增加。 之前 10000M 现在 500M。 20倍网络传输性能的消耗。20倍内存消耗的减少。 虽然说,不一定会对性能产生决定向性的作用。比如运行30分钟的spark作业,可能做了广播变量以后,速度快了2分钟。变成28分钟。
注意一点:广播变量创建后,它可以运行在集群中的任何Executor上,而不需要多次传递给集群节点。另外需要记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。