前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Shuffle演进

Spark Shuffle演进

作者头像
大数据真好玩
发布2019-08-08 15:34:34
7180
发布2019-08-08 15:34:34
举报
文章被收录于专栏:暴走大数据

Shuffle就是将不同节点上相同的Key拉取到一个节点的过程。这之中涉及到各种IO,所以执行时间势必会较长。对shuffle的优化也是spark job优化的重点。

1.Hash Shuffle

Spark的Shuffle在1.2之前默认的计算引擎是HashShuffleManager

假设每个executor只有一个core,意味着一个executor只能同时运行一个task。有三个Reducer,每个reducer都会从上游拉取对应block file,每个task会为下游的每一个reducer生成一个block文件,这样算,总的文件个数就是上一个stage的分区数 × 下游的分区数 (如图是12个) 如果分区数比较多,map task就比较多,下个stage的reduce task也比较多,那就会很多小文件产生,IO消耗很大,显然不好。

2. 优化的Hash Shuffle

优化就是复用buffer,也就使输出的block文件合并了。开启合并机制spark.shuffle.consolidateFiles=true

如图,一个core不管有几个task都会复用同一个buffer,这样生成的文件个数即为core × reducer。很明显比优化前少了很多。但如果下游stage的分区很多的话,文件仍然多。

3.Sort Shuffle

在Spark1.2版本之后,出现了SortShuffle,这种方式以更少的中间磁盘文件产生而远远优于HashShuffle。

在该模式下,数据会先写入一个内存数据结构中(默认5M),Map或者Array。如果使reduceByKey类算子,就用Map,join类算子就用Array。每条数据写入内存后就会判断是否达到阈值,如果达到了就溢写磁盘,最后清空内存。shuffle中的定时器会定时会检查内存数据结构的大小,如果内存数据结构空间不够,那么会申请额外的内存 在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序,再以默认每批1w条数据通过BufferedOutputStream写入磁盘。 最后,再把这些文件合并成一个文件,并多出一个索引文件来告诉下游task从哪个offset开始读取。 结果生成的文件个数为 map task × 2,已经大大减少了。

3.ByPass机制的Sort Shuffle

满足以下两个条件: shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认200)。 不是聚合类的shuffle算子(比如reduceByKey)。

在这种机制下,当前stage的task会为每个下游的task都创建临时磁盘文件。将数据按照key值进行hash,然后根据hash值,将key写入对应的磁盘文件中。最终,同样会将所有临时文件依次合并成一个磁盘文件,建立索引。 本质上就是在Hash Shuffle后进行了小文件的合并。相比普通机制的Sort Shuffle,文件个数也是map task × 2,但省去了排序的过程消耗。

— THE END —

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-07-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.Hash Shuffle
  • 2. 优化的Hash Shuffle
  • 3.Sort Shuffle
  • 3.ByPass机制的Sort Shuffle
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档