前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark程序开发调优(后续)

Spark程序开发调优(后续)

作者头像
木野归郎
发布2020-06-15 14:46:00
7690
发布2020-06-15 14:46:00
举报
文章被收录于专栏:share ai happiness

原则六:使用 map-side 预聚合的 shuffle 操作

如果因为业务需要,一定要使用 shuffle 操作,无法用 map 类的算子来替代,那么尽量使用可以 map-side 预聚合的算子。

所谓的 map-side 预聚合,说的是在每个节点本地对相同的 key 进行一次聚合操作,类似于 MapReduce 中的本地 combiner。map-side 预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的 key 都被聚合起来了。其他节点在拉取所有节点上的相同 key 时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘 IO 以及网络传输开销。通常来说,在可能的情况下,建议使用 reduceByKey 或者 aggregateByKey 算子来替代掉 groupByKey 算子。因为 reduceByKey 和 aggregateByKey 算子都会使用用户自定义的函数对每个节点本地的相同key 进行预聚合。而 groupByKey 算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

比如下图,就是典型的例子,分别基于 reduceByKey 和 groupByKey 进行单词计数。其中第一张图是 groupByKey 的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是 reduceByKey 的原理图,可以看到,每个节点本地的相同key 数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。

原则七:使用高性能的算子

除了 shuffle 相关的算子有优化原则之外,其他的算子也都有着相应的优化原则。

使用 reduceByKey/aggregateByKey 替代 groupByKey

详情见“原则六:使用 map-side 预聚合的 shuffle 操作”。

使用 mapPartitions 替代普通 map

mapPartitions 类的算子,一次函数调用会处理一个 partition 所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用 mapPartitions 会出现 OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个 partition 所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现 OOM 异常。所以使用这类操作时要慎重!

使用 foreachPartitions 替代 foreach

原理类似于“使用 mapPartitions 替代 map”,也是一次函数调用处理一个 partition 的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions 类的算子,对性能的提升还是很有帮助的。比如在 foreach 函数中,将 RDD 中所有数据写 MySQL,那么如果是普通的 foreach 算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions 算子一次性处理一个 partition 的数据,那么对于每个 partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于 1 万条左右的数据量写 MySQL,性能可以提升 30%以上。

使用 filter 之后进行 coalesce 操作

通常对一个 RDD 执行 filter 算子过滤掉 RDD 中较多数据后(比如 30%以上的数据),建议使用 coalesce 算子,手动减少 RDD 的 partition 数量,将 RDD 中的数据压缩到更少的 partition中去。因为 filter 之后,RDD 的每个 partition 中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个 task 处理的 partition 中的数据量并不是很多,有一点资源浪费,而且此时处理的 task 越多,可能速度反而越慢。因此用 coalesce 减少 partition 数量,将 RDD中的数据压缩到更少的 partition 之后,只要使用更少的 task 即可处理完所有的 partition。在某些场景下,对于性能的提升会有一定的帮助。

使用 repartitionAndSortWithinPartitions 替代 repartition 与 sort 类操作

repartitionAndSortWithinPartitions 是 Spark 官网推荐的一个算子,官方建议,如果需要在 repartition 重分区之后,还要进行排序,建议直接使用 repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的 shuffle 操作,一边进行排序。shuffle 与 sort 两个操作同时进行,比先 shuffle 再 sort 来说,性能可能是要高的。

原则八:使用 Kryo 优化序列化性能

在 Spark 中,主要有三个地方涉及到了序列化:

1、在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。

2、将自定义的类型作为 RDD 的泛型类型时(比如 JavaRDD,Student 是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable 接口。

3、使用可序列化的持久化策略时(比如 MEMORY_ONLY_SER),Spark 会将 RDD 中的每个 partition 都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用 Kryo 序列化类库,来优化序列化和 反 序 列 化 的 性 能 。 Spark 默 认 使 用 的 是 Java 的 序 列 化 机 制 , 也 就 是ObjectOutputStream/ObjectInputStream API 来进行序列化和反序列化。但是 Spark 同时支持使用 Kryo 序列化库,Kryo 序列化类库的性能比 Java 序列化类库的性能要高很多。官方介绍,Kryo 序列化机制比 Java 序列化机制,性能高 10 倍左右。Spark 之所以默认没有使用 Kryo 作为序列化类库,是因为 Kryo 要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

以下是使用 Kryo 的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为 RDD 泛型类型的自定义类型等):

代码语言:javascript
复制
// 创建 SparkConf 对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为 KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

原则九:优化数据结构

Java 中,有三种类型比较耗费内存:

1、对象,每个 Java 对象都有对象头、引用等额外的信息,因此比较占用内存空间。

2、字符串,每个字符串内部都有一个字符数组以及长度等额外信息。

3、集合类型,比如 HashMap、LinkedList 等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如 Map.Entry。

因此 Spark 官方建议,在 Spark 编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低 GC 频率,提升性能。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 OnlyCoding 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档