4.4 共享变量

4.4 共享变量

一般来说,当一个被传递给Spark操作(例如,Map和Reduce)的函数在一个远程集群上运行时,该函数实际上操作的是它用到的所有变量的独立副本。

这些变量会被复制到每一台机器,在远程机器上对变量的所有更新都不会传回主驱动程序。默认来说,当Spark以多个Task在不同的Worker上并发运行一个函数时,它传递每一个变量的副本并缓存在Worker上,用于每一个独立Task运行的函数中。

有时,我们需要变量能够在任务中共享,或者在任务与驱动程序之间共享。

而Spark提供两种模式的共享变量:广播变量和累加器。Spark的第二个抽象便是可以在并行计算中使用的共享变量。

□广播变量:可以在内存的所有节点中被访问,用于缓存变量(只读);

□累加器:只能用来做加法的变量,如计数和求和。

4.4.1 广播变量

广播变量允许程序员保留一个只读的变量,缓存在每一台Worker节点的Cache,而不是每个Task发送一份副本。例如,可以给每个Worker节点设置一个输入数据集副本,Spark会尝试使用一种高效的广播算法传播广播变量,从而减少通信的代价。

广播变量是通过调用SparkContext.broadcast(v)方法从变量v创建的,广播变量是一个v的封装,它的值可以通过调用value方法获得,代码如下:

            scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value

res0: Array[Int] = Array(1, 2, 3)

在广播变量被创建后,可以在集群运行的任何函数中代替v值被调用,由于v值在第一次调用后缓存到任务节点,重复调用时不需要被再次传递到这些节点上。另外,对象v不能在广播后修改,这样可以保证所有节点收到相同的广播值。

4.4.2 累加器

累加器是一种只能通过关联操作进行“加”操作的变量,因此可以在并行计算中得到高效的支持。类似MapReduce中的counter,可以用来实现计数和求和等功能。Spark原生支持Int和Double类型的累加器,程序员可以自己添加新的支持类型。

累加器可以通过调用SparkContext.accumulator(v)方法从一个初始值v中创建。运行在集群上的任务,可以通过使用+=进行累加,但是不能进行读取。只有主程序可以使用value的方法读取累加器的值。

下面的代码展示了如何利用累加器,将一个数组里面的所有元素相加。

            scala> val accum = sc.accumulator(0)

accum: org.apache.spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

...

*** INFO scheduler.DAGScheduler: Stage 0 finished in 0.111 s

*** INFO spark.SparkContext: Job finished took 0.288603412 s

scala> accum.value

res1: Int = 10

当然,这段代码使用的是累加器内置支持的Int类型,程序员也可以通过创建AccumulatorParam的子类来创建自己的类型。该AccumulatorParam接口有两个方法:提供了一个“zero”值进行初始化,以及一个addInPlace方法将两个值相加,如果需要可以自己尝试需要的类型,如Vector。

4.5 本章小结

总之,RDD是Spark的核心,也是整个Spark的架构基础。RDD是在集群应用中分享数据的一种高效、通用、容错的抽象,是由Spark提供的最重要的抽象的概念,它是一种有容错机制的特殊集合,可以分布在集群的节点上,以函数式编程操作集合的方式,进行各种并行操作。

本章重点讲解了如何创建Spark的RDD,以及RDD的一系列转换和执行操作,并给出一些基于Scala编程语言的支持。并对广播变量和累加器两种模式的共享变量进行了讲解,但是在此仅仅讲解了RDD的基础相关部分,对RDD在执行过程中的依赖转换,以及RDD的可选特征优先计算位置(preferred locations)和分区策略,并没有进行详细描述,在后面的章节中会结合实例对此进行重点讲述。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏屈定‘s Blog

设计模式--动态代理的思考

在一些第三方框架中经常能看到动态代理的案例,尤其是RPC框架,ORM框架等,该篇将分析这些实现的原理,另外延伸在业务中的使用示例.

1323
来自专栏Hadoop实操

使用JDBC向Kudu表插入中文字符-双引号的秘密

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。 1.问题描述 使用Impala JDBC向Kudu表中插入中文字符,插入的中文字符串乱码,中文字...

3817
来自专栏我是攻城师

如何使用Spark大规模并行构建索引

3524
来自专栏数据处理

split函数使用的一个小故障

1688
来自专栏JackieZheng

Hadoop阅读笔记(七)——代理模式

  关于Hadoop已经小记了六篇,《Hadoop实战》也已经翻完7章。仔细想想,这么好的一个框架,不能只是流于应用层面,跑跑数据排序、单表链接等,想得其精髓,...

21510
来自专栏一个爱瞎折腾的程序猿

nodejs常用代码片段

调用:node index.js --target test 接收:const config=loadConifg(['target'],'--') //co...

1292
来自专栏王小雷

MapReduce的过程(2)

MapReduce的编程思想(1) MapReduce的过程(2) 1. MapReduce从输入到输出 一个MapReduce的作业经过了input、map、...

2705
来自专栏用户2442861的专栏

为什么很多类甚者底层源码要implements Serializable ?

在碰到异常类RuntimeException时,发现Throwable实现了 Serializable,还有我们平进的javabean一般也要实现Seriali...

731
来自专栏大内老A

ASP.NET MVC基于标注特性的Model验证:ValidationAttribute

通过前面的介绍我们知道ModelValidatorProviders的静态只读Providers维护着一个全局的ModelValidatorProvider列表...

22010
来自专栏我是攻城师

Spark中foreachPartition和mapPartitions的区别

5765

扫码关注云+社区

领取腾讯云代金券