前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 性能优化——和 shuffle 搏斗

Spark 性能优化——和 shuffle 搏斗

作者头像
四火
发布2022-07-19 13:53:07
2810
发布2022-07-19 13:53:07
举报
文章被收录于专栏:四火的唠叨

Spark 的性能分析和调优很有意思,今天再写一篇。主要话题是 shuffle,当然也牵涉一些其他代码上的小把戏。

以前写过一篇文章,比较了几种不同场景的性能优化,包括 portal 的性能优化,web service 的性能优化,还有 Spark job 的性能优化。Spark 的性能优化有一些特殊的地方,比如实时性一般不在考虑范围之内,通常我们用 Spark 来处理的数据,都是要求异步得到结果的数据;再比如数据量一般都很大,要不然也没有必要在集群上操纵这么一个大家伙,等等。事实上,我们都知道没有银弹,但是每一种性能优化场景都有一些特定的 “大 boss”,通常抓住和解决大 boss 以后,能解决其中一大部分问题。比如对于 portal 来说,是页面静态化,对于 web service 来说,是高并发(当然,这两种可以说并不确切,这只是针对我参与的项目总结的经验而已),而对于 Spark 来说,这个大 boss 就是 shuffle。

首先要明确什么是 shuffle。Shuffle 指的是从 map 阶段到 reduce 阶段转换的时候,即 map 的 output 向着 reduce 的 input 映射的时候,并非节点一一对应的,即干 map 工作的 slave A,它的输出可能要分散跑到 reduce 节点 A、B、C、D …… X、Y、Z 去,就好像 shuffle 的字面意思 “洗牌” 一样,这些 map 的输出数据要打散然后根据新的路由算法(比如对 key 进行某种 hash 算法),发送到不同的 reduce 节点上去。(下面这幅图来自 《Spark Architecture: Shuffle》

为什么说 shuffle 是 Spark job 的大 boss,就是因为 Spark 本身的计算通常都是在内存中完成的,比如这样一个 map 结构的 RDD:(String, Seq),key 是字符串,value 是一个 Seq,如果只是对 value 进行一一映射的 map 操作,比如(1)先计算 Seq 的长度,(2)再把这个长度作为元素添加到 Seq 里面去。这两步计算,都可以在 local 完成,而事实上也是在内存中操作完成的,换言之,不需要跑到别的 node 上去拿数据,因此执行的速度是非常快的。但是,如果对于一个大的 rdd,shuffle 发生的时候,就会因为网络传输、数据序列化/反序列化产生大量的磁盘 IO 和 CPU 开销。这个性能上的损失是非常巨大的。

要减少 shuffle 的开销,主要有两个思路:

  1. 减少 shuffle 次数,尽量不改变 key,把数据处理在 local 完成;
  2. 减少 shuffle 的数据规模。

先去重,再合并

比如有 A、B 这样两个规模比较大的 RDD,如果各自内部有大量重复,那么二者一合并,再去重:

代码语言:javascript
复制
A.union(B).distinct()

这样的操作固然正确,但是如果可以先各自去重,再合并,再去重,可以大幅度减小 shuffle 的开销(注意 Spark 的默认 union 和 Oracle 里面的 “union all” 很像——不去重):

代码语言:javascript
复制
A.distinct().union(B.distinct()).distinct()

看起来变复杂了对不对,但是当时我解决这个问题的时候,用第二种方法时间开销从 3 个小时减到 20 分钟。

如果中间结果 rdd 如果被调用多次,可以显式调用 cache() 和 persist(),以告知 Spark,保留当前 rdd。当然,即便不这么做,Spark 依然存放不久前计算过的结果(以下来自官方指南):

Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey), even without users calling persist. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.

数据量大,并不一定慢。通常情况下,由于 Spark 的 job 是放到内存里面进行运算的,因此一个复杂的 map 操作不一定执行起来很慢。但是如果牵涉到 shuffle,这里面有网络传输和序列化的问题,就有可能非常慢。

类似地,还有 filter 等等操作,目的也是要先对大的 RDD 进行 “瘦身” 操作,然后在做其他操作。

mapValues 比 map 好

明确 key 不会变的 map,就用 mapValues 来替代,因为这样可以保证 Spark 不会 shuffle 你的数据:

代码语言:javascript
复制
A.map{case (A, ((B, C), (D, E))) => (A, (B, C, E))}

改成:

代码语言:javascript
复制
A.mapValues{case ((B, C), (D, E)) => (B, C, E)}

用 broadcast + filter 来代替 join

这种优化是一种特定场景的神器,就是拿大的 RDD A 去 join 一个小的 RDD B,比如有这样两个 RDD:

  • A 的结构为 (name, age, sex),表示全国人民的 RDD,超大
  • B 的结果为 (age, title),表示 “年龄 -> 称号” 的映射,比如 60 岁有称号 “花甲之年”,70 岁则是 “古稀之年”,这个 RDD 显然很小,因为人的年龄范围在 0~200 岁之间,而且有的 “年龄” 还没有 “称号”

现在我要从全国人民中找出这些有称号的人来。如果直接写成:

代码语言:javascript
复制
A.map{case (name, age, sex) => (age, (name, sex))}
 .join(B)
 .map{case (age, ((name, sex), title)) => (name, age, sex)}

你就可以想象,执行的时候超大的 A 被打散和分发到各个节点去。而且更要命的是,为了恢复一开始的 (name, age, sex) 的结构,又做了一次 map,而这次 map 一样导致 shuffle。两次 shuffle,太疯狂了。但是如果这样写:

代码语言:javascript
复制
val b = sc.broadcast(B.collectAsMap)
A.filter{case (name, age, sex) => b.values.contains(age)}

一次 shuffle 都没有,A 老老实实待着不动,等着全量的 B 被分发过来。

另外,在 Spark SQL 里面直接有 BroadcastHashJoin,也是把小的 rdd 广播出去。

不均匀的 shuffle

在工作中遇到这样一个问题,需要转换成这样一个非常巨大的 RDD A,结构是 (countryId, product),key 是国家 id,value 是商品的具体信息。当时在 shuffle 的时候,这个 hash 算法是根据 key 来选择节点的,但是事实上这个 countryId 的分布是极其不均匀的,大部分商品都在美国(countryId=1),于是我们通过 Ganglia 看到,其中一台 slave 的 CPU 特别高,计算全部聚集到那一台去了。

找到原因以后,问题解决就容易了,要么避免这个 shuffle,要么改进一下 key,让它的 shuffle 能够均匀分布(比如可以拿 countryId+商品名称的 tuple 作 key,甚至生成一个随机串)。

明确哪些操作必须在 master 完成

如果想打印一些东西到 stdout 里去:

代码语言:javascript
复制
A.foreach(println)

想把 RDD 的内容逐条打印出来,但是结果却没有出现在 stdout 里面,因为这一步操作被放到 slave 上面去执行了。其实只需要 collect 一下,这些内容就被加载到 master 的内存中打印了:

代码语言:javascript
复制
A.collect.foreach(println)

再比如,如果遇到 RDD 操作嵌套的情况,通常考虑优化掉,因为只有 master 才能去理解和执行 RDD 的操作,slave 只能处理被分配的 task 而已。比如:

代码语言:javascript
复制
A.map{case (keyA, valueA) => doSomething(B.lookup(keyA).head, valueA)}

就可以用 join 来代替:

代码语言:javascript
复制
A.join(B).map{case (key, (valueA, valueB)) => doSomething(valueB, valueA)}

用 reduceByKey 代替 groupByKey

这一条应该是比较经典的了。reduceByKey 会在当前节点(local)中做 reduce 操作,也就是说,会在 shuffle 前,尽可能地减小数据量。而 groupByKey 则不是,它会不做任何处理而直接去 shuffle。当然,有一些场景下,功能上二者并不能互相替换。因为 reduceByKey 要求参与运算的 value,并且和输出的 value 类型要一样,但是 groupByKey 则没有这个要求。

有一些类似的 xxxByKey 操作,都比 groupByKey 好,比如 foldByKey 和 aggregateByKey。

另外,还有一条类似的是用 treeReduce 来代替 reduce,主要是用于单个 reduce 操作开销比较大,可以条件 treeReduce 的深度来控制每次 reduce 的规模。

文章未经特殊标明皆为本人原创,未经许可不得用于任何商业用途,转载请保持完整性并注明来源链接 《四火的唠叨》

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档