Spark2.x学习笔记:12、Shuffle机制

12、Shuffle机制

12.1 背景

在MapReduce计算框架中,shuffle是连接Map和Reduce之间的桥梁。

Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。

Shuffle的本义是洗牌、混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。

为什么MapReduce计算模型需要Shuffle过程?我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过Shuffle来获取数据。

12.2 Spark Shuffle

Spark作为MapReduce框架的一种实现,自然也实现了shuffle的逻辑。 为什么Spark计算框架也需要Shuffle机制呢?我们知道,Spark计算框架是在分布式环境下运行,这就意味着不可能在单个进程空间中容纳所有计算需要的数据。数据需要按照key进行分区,然后打散分布到集群的各个进程的存储空间中。这里存在一个问题,不存在唯一的分区方式 满足所有的算子计算要求。比如对数据进行排序时,就需要重新按照一定规则对数据重新分区。Shuffle就是包裹在各种需要重新分区的算子之下的一个对数据集重新分区组合的过程。 由于重新分区需要知道分区规则,而分区规则按照数据的Key通过映射函数进行划分,由数据确定出key的过程就是Map过程。同时Map过程也可以做数据处理,例如join算法中一种经典算法Map Side Join,就是确定数据该放到哪个分区。Shuffle将数据进行收集分配到指定的Reduce分区

每个Reduce Task需要从每个MapTask读取一部分数据,则网络连接数是:M*R,其中M是MapTask数,R是Reduce Task数。也就是Shuffle产生文件数M*N。所以说Shuffle是分布式计算框架的核心数据交换方式,其实现方式直接决定了计算框架的性能和扩展性。

产生Shuffle的算子:join,cogroup,各类ByKey(reduceByKey,groupByKey,sortByKey)。

在Spark中,每个Action算子(Action算子才是真正执行)都转换为一个Job,每个Job转换为一个有向无环图DAG来执行。DAG在每个Stage承接阶段做Shuffle过程,也就是说根据Shuffle划分了各个Stage。

Spark Shuffle可以分为两个阶段:Shuffle Write阶段和Shuffle Read阶段(也称为Shuffle Fetch ,包含了Aggregate)。

(1) Shuffle Write阶段 为了分析方便,假定每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。参见下面HashShuffleManager图示。

(2) shuffle read阶段 shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task为下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

12.3 ShuffleManager

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。 (1)HashShuffleManager 在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,当前stage的task数乘以下一个stage的task数,也就是上面说的M*R。

(2)SortShuffleManager 因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

那么使用SortShuffleManager的Shuffle文件数是Core*R,Core是CPU核心数,我们可以认为Core是常量。这样Shuffle文件数大大减少。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏CRPER折腾记

React 折腾记 - (3) 结合Mobx实现一个比较靠谱的动态tab水平菜单,同时关联侧边栏

1282
来自专栏葡萄城控件技术团队

VS2010 Extension实践(2)

在上一篇(VS2010 Extension (1)实践)里,主要展示了如何使用MEF扩展VS2010,来扩展编辑控制和展现自己的UI;在实现QuickToolb...

1979
来自专栏Youngxj

EMLOG评论获取QQ资料

2154
来自专栏walterlv - 吕毅的博客

当我们使用 MVVM 模式时,我们究竟在每一层里做些什么?

2017-11-29 17:29

701
来自专栏日常学python

使用requests+BeautifulSoup的简单爬虫练习

这是日常学python的第17篇原创文章 上篇文章说了BeautifulSoup库之后,今篇文章就是利用上篇的知识来爬取我们今天的主题网站:猫眼电影top100...

2576
来自专栏自动化测试实战

Flask第36篇——模板项目实战(二)

前面我们利用宏将首页代码进行了第一次优化。如果我们现在还有其他页面,试想一下,首页上面的搜索框

573
来自专栏xingoo, 一个梦想做发明家的程序员

[看图说话] 基于Spark UI性能优化与调试——初级篇

Spark有几种部署的模式,单机版、集群版等等,平时单机版在数据量不大的时候可以跟传统的java程序一样进行断电调试、但是在集群上调试就比较麻烦了...远程断...

2825
来自专栏阿凯的Excel

小小查找键、大大大乐趣

你是不是在高速路口堵车的时候看到本文章呢? 不要问我在干嘛! 今天和大家愉快的分享的是查找键!! 神马?查找键还需要分享? 查找键其实分两类,一种是Ctr...

2064
来自专栏Spring相关

feignClient中修改ribbon的配置

在使用@FeignClient注解的时候 是默认使用了ribbon进行客户端的负载均衡的,默认的是随机的策略,那么如果我们想要更改策略的话,需要修改消费者yml...

1031
来自专栏AI2ML人工智能to机器学习

TF Boy 之初筵 - 机器簇

在 "TF Boy 之初筵 - 分布十三式", 里面,我们提到参数服务器PS+Worker。 我们基本上是在Client里面定义好了TF模型(网络图),然后提交...

711

扫码关注云+社区