前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark程序开发调优(前奏)

Spark程序开发调优(前奏)

作者头像
木野归郎
发布2020-06-15 14:32:34
3250
发布2020-06-15 14:32:34
举报
文章被收录于专栏:share ai happinessshare ai happiness

程序开发开发调优

Spark 性能优化的第一步,就是要在开发 Spark 作业的过程中注意和应用一些性能优化的基本原则。开发调优,就是要让大家了解以下一些 Spark 基本开发原则,包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。在开发过程中,时时刻刻都应该注意以上原则,并将这些原则根据具体的业务以及实际的应用场景,灵活地运用到自己的 Spark 作业中。

原则一:避免创建重复的 RDD

通常来说,我们在开发一个 Spark 作业时,首先是基于某个数据源(比如 Hive 表或 HDFS文件)创建一个初始的 RDD;接着对这个 RDD 执行某个算子操作,然后得到下一个 RDD;以此类推,循环往复,直到计算出最终我们需要的结果。在这个过程中,多个 RDD 会通过不同的算子操作(比如 map、reduce 等)串起来,这个“RDD 串”,就是 RDD lineage,也就是“RDD 的血缘关系链”。

我们在开发过程中要注意:对于同一份数据,只应该创建一个 RDD,不能创建多个 RDD来代表同一份数据。

一 些 Spark 初 学 者 在 刚 开 始 开 发 Spark 作 业 时 , 或 者 是 有 经 验 的 工 程 师 在 开 发 RDDlineage 极其冗长的 Spark 作业时,可能会忘了自己之前对于某一份数据已经创建过一个 RDD了,从而导致对于同一份数据,创建了多个 RDD。这就意味着,我们的 Spark 作业会进行多次重复计算来创建多个代表相同数据的 RDD,进而增加了作业的性能开销。

一个简单的例子:

代码语言:javascript
复制
//需要对名为“hello.txt”的 HDFS 文件进行一次 map 操作,再进行一次 reduce 操作。
//也就是说,需要对一份数据执行两次算子操作。
//错误的做法:对于同一份数据执行多次算子操作时,创建多个 RDD。
//这里执行了两次 textFile 方法,针对同一个 HDFS 文件,创建了两个 RDD 出来,
//然后分别对每个 RDD 都执行了一个算子操作。
//这种情况下,Spark 需要从 HDFS 上两次加载 hello.txt 文件的内容,并创建两个单独的 RDD;
//第二次加载 HDFS 文件以及创建 RDD 的性能开销,很明显是白白浪费掉的。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd2.reduce(...)
//正确的用法:对于一份数据执行多次算子操作时,只使用一个 RDD。
//这种写法很明显比上一种写法要好多了,因为我们对于同一份数据只创建了一个 RDD,
//然后对这一个 RDD 执行了多次算子操作。
//但是要注意到这里为止优化还没有结束,由于 rdd1 被执行了两次算子操作,第二次执行 reduce 操作的时候,
//还会再次从源头处重新计算一次 rdd1 的数据,因此还是会有重复计算的性能开销。
//要彻底解决这个问题,必须结合“原则三:对多次使用的 RDD 进行持久化”,
//才能保证一个 RDD 被多次使用时只被计算一次。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt"
rdd1.map(...) 
rdd1.reduce(...)

原则二:尽可能复用同一个 RDD

除了要避免在开发过程中对一份完全相同的数据创建多个 RDD 之外,在对不同的数据执行算子操作时还要尽可能地复用一个 RDD。比如说,有一个 RDD 的数据格式是 key-value类型的,另一个是单 value 类型的,这两个 RDD 的 value 数据是完全一样的。那么此时我们可以只使用 key-value 类型的那个 RDD,因为其中已经包含了另一个的数据。对于类似这种多个 RDD 的数据有重叠或者包含的情况,我们应该尽量复用一个 RDD,这样可以尽可能地减少 RDD 的数量,从而尽可能减少算子执行的次数。

一个简单的例子:

代码语言:javascript
复制
// 错误的做法。
// 有一个<long , String>格式的 RDD,即 rdd1。
// 接着由于业务需要,对 rdd1 执行了一个 map 操作,创建了一个 rdd2,
//而 rdd2 中的数据仅仅是 rdd1 中的 value 值而已,也就是说,rdd2 是 rdd1 的子集。
JavaPairRDD<long , String> rdd1 = ...
JavaRDD<string> rdd2 = rdd1.map(...)
// 分别对 rdd1 和 rdd2 执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)
// 正确的做法。
// 上面这个 case 中,其实 rdd1 和 rdd2 的区别无非就是数据格式不同而已,
//rdd2 的数据完全就是 rdd1 的子集而已,却创建了两个 rdd,并对两个 rdd 都执行了一次算子操作。
// 此时会因为对 rdd1 执行 map 算子来创建 rdd2,而多执行一次算子操作,进而增加性能开销。
// 其实在这种情况下完全可以复用同一个 RDD。
// 我们可以使用 rdd1,既做 reduceByKey 操作,也做 map 操作。
// 在进行第二个 map 操作时,只使用每个数据的 tuple._2,也就是 rdd1 中的 value 值,即可。
JavaPairRDD<long , String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
// 第二种方式相较于第一种方式而言,很明显减少了一次 rdd2 的计算开销。
// 但是到这里为止,优化还没有结束,对 rdd1 我们还是执行了两次算子操作,rdd1 实际上还是会被计算两
次。
// 因此还需要配合“原则三:对多次使用的 RDD 进行持久化”进行使用,
//才能保证一个 RDD 被多次使用时只被计算一次。

原则三:对多次使用的 RDD 进行持久化

当你在 Spark 代码中多次对一个 RDD 做了算子操作后,你已经实现 Spark 作业第一步的优化了,也就是尽可能复用 RDD 时就该在这个基础之上,进行第二步优化了,也就是要保证对一个 RDD 执行多次算子操作时,这个 RDD 本身仅仅被计算一次。

Spark 中对于一个 RDD 执行多次算子的默认原理是这样的:每次你对一个 RDD 执行一个算子操作时,都会重新从源头处计算一遍,计算出那个 RDD 来,然后再对这个 RDD 执行你的算子操作。这种方式的性能是很差的。

因此对于这种情况,我们的建议是:对多次使用的 RDD 进行持久化。此时 Spark 就会根据你的持久化策略,将 RDD 中的数据保存到内存或者磁盘中。以后每次对这个 RDD 进行算子操作时,都会直接从内存或磁盘中提取持久化的 RDD 数据,然后执行算子,而不从源头处重新计算一遍这个 RDD,再执行算子操作。

对多次使用的 RDD 进行持久化的代码示例:

代码语言:javascript
复制
// 如果要对一个 RDD 进行持久化,只要对这个 RDD 调用 cache()和 persist()即可。
// 正确的做法。
// cache()方法表示:使用非序列化的方式将 RDD 中的数据全部尝试持久化到内存中。
// 此时再对 rdd1 执行两次算子操作时,只有在第一次执行 map 算子时,才会将这个 rdd1 从源头处计算一次。
// 第二次执行 reduce 算子时,就会直接从内存中提取数据进行计算,不会重复计算一个 rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)
// persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
// 比如说,StorageLevel.MEMORY_AND_DISK_SER 表示,内存充足时优先持久化到内存中,
//内存不充足时持久化到磁盘文件中。
// 而且其中的_SER 后缀表示,使用序列化的方式来保存 RDD 数据,此时 RDD 中的每个 partition
//都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。
// 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,
//从而发生频繁 GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
.persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)

Spark 的持久化级别

原则四:尽量避免使用 shuffle 类算子

如果有可能的话,要尽量避免使用 shuffle 类算子。因为 Spark 作业运行过程中,最消耗性能的地方就是 shuffle 过程。shuffle 过程,简单来说,就是将分布在集群中多个节点上的同一个 key,拉取到同一个节点上,进行聚合或 join 等操作。比如 reduceByKey、join 等算子,都会触发 shuffle 操作。

shuffle 过程中,各个节点上的相同 key 都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同 key。而且相同 key 都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的 key 过多,导致内存不够存放,进而溢写到磁盘文件中。因此在 shuffle 过程中,可能会发生大量的磁盘文件读写的 IO 操作,以及数据的网络传输操作。磁盘 IO 和网络数据传输也是 shuffle 性能较差的主要原因。

因此在 我们的开发过 程中,能避免 则尽可能避 免使用 reduceByKey、join、distinct、repartition 等会进行 shuffle 的算子,尽量使用 map 类的非 shuffle 算子。这样的话,没有 shuffle操作或者仅有较少 shuffle 操作的 Spark 作业,可以大大减少性能开销。

Broadcast 与 map 进行 join 代码示例:

代码语言:javascript
复制
// 传统的 join 操作会导致 shuffle 操作。
// 因为两个 RDD 中,相同的 key 都需要通过网络拉取到一个节点上,由一个 task 进行 join 操作。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map 的 join 操作,不会导致 shuffle 操作。
// 使用 Broadcast 将一个数据量较小的 RDD 作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在 rdd1.map 算子中,可以从 rdd2DataBroadcast 中,获取 rdd2 的所有数据。
// 然后进行遍历,如果发现 rdd2 中某条数据的 key 与 rdd1 的当前数据的 key 是相同的,
//那么就判定可以进行 join。
// 此时就可以根据自己需要的方式,将 rdd1 当前数据与 rdd2 中可以连接的数据,
//拼接在一起(String 或 Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注意,以上操作,建议仅仅在 rdd2 的数据量比较少(比如几百 M,或者一两 G)的情况下使用。
// 因为每个 Executor 的内存中,都会驻留一份 rdd2 的全量数据。

原则五:广播大变量

有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如 100M 以上的大集合),那么此时就应该使用 Spark 的广播(Broadcast)功能来提升性能。

在算子函数中使用到外部变量时,默认情况下,Spark 会将该变量复制多个副本,通过网络传输到 task 中,此时每个 task 都有一个变量副本。如果变量本身比较大的话(比如 100M,甚至 1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的 Executor 中占用过多内存导致的频繁 GC,都会极大地影响性能。

因此对于上述情况,如果使用的外部变量比较大,建议使用 Spark 的广播功能,对该变量进行广播。广播后的变量,会保证每个 Executor 的内存中,只驻留一份变量副本,而 Executor中的 task 执行时共享该 Executor 中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对 Executor 内存的占用开销,降低 GC 的频率。

广播大变量的代码示例:

代码语言:javascript
复制
// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个 task 都会有一份 list1 的副本。
val list1 = ...
rdd1.map(list1...)
// 以下代码将 list1 封装成了 Broadcast 类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前 task 所在 Executor 内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从 Driver 或者其他 Executor 节点上远程拉取一份放到本地 Executor 内存
中。
// 每个 Executor 内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 OnlyCoding 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档