3.6 Shuffle机制

3.6 Shuffle机制

在MapReduce框架中,Shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过Shuffle这个环节,Shuffle的性能高低直接影响了整个程序的性能和吞吐量。Spark作为MapReduce框架的一种实现,自然也实现了Shuffle的逻辑。对于大数据计算框架而言,Shuffle阶段的效率是决定性能好坏的关键因素之一。

3.6.1 什么是Shuffle

Shuffle是MapReduce框架中的一个特定的阶段,介于Map阶段和Reduce阶段之间,当Map的输出结果要被Reduce使用时,输出结果需要按关键字值(key)哈希,并且分发到每一个Reducer上,这个过程就是Shuffle。直观来讲,Spark Shuffle机制是将一组无规则的数据转换为一组具有一定规则数据的过程。由于Shuffle涉及了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响整个程序的运行效率。

在MapReduce计算框架中,Shuffle连接了Map阶段和Reduce阶段,即每个Reduce Task从每个Map Task产生的数据中读取一片数据,极限情况下可能触发M*R个数据拷贝通道(M是Map Task数目,R是Reduce Task数目)。通常Shuffle分为两部分:Map阶段的数据准备和Reduce阶段的数据拷贝。首先,Map阶段需根据Reduce阶段的Task数量决定每个Map Task输出的数据分片数目,有多种方式存放这些数据分片:

1)保存在内存中或者磁盘上(Spark和MapReduce都存放在磁盘上)。

2)每个分片对应一个文件(现在Spark采用的方式,以及以前MapReduce采用的方式),或者所有分片放到一个数据文件中,外加一个索引文件记录每个分片在数据文件中的偏移量(现在MapReduce采用的方式)。

因此可以认为Spark Shuffle与Mapreduce Shuffle的设计思想相同,但在实现细节和优化方式上不同。

在Spark中,任务通常分为两种,Shuffle mapTask和reduceTask,具体逻辑如图3-11所示:

[插图]

图3-11 Spark Shuffl e

图3-11中的主要逻辑如下:

1)首先每一个MapTask会根据ReduceTask的数量创建出相应的bucket, bucket的数量是M×R,其中M是Map的个数,R是Reduce的个数。

2)其次MapTask产生的结果会根据设置的partition算法填充到每个bucket中。这里的partition算法是可以自定义的,当然默认的算法是根据key哈希到不同的bucket中。

当ReduceTask启动时,它会根据自己task的id和所依赖的Mapper的id从远端或本地的block manager中取得相应的bucket作为Reducer的输入进行处理。

这里的bucket是一个抽象概念,在实现中每个bucket可以对应一个文件,可以对应文件的一部分或是其他等。Spark shuffle可以分为两部分:

1)将数据分成bucket,并将其写入磁盘的过程称为Shuffle Write。

2)在存储Shuffle数据的节点Fetch数据,并执行用户定义的聚集操作,这个过程称为Shuffle Fetch。

3.6.2 Shuffle历史及细节

下面介绍Shuffle Write与Fetch。

1. Shuffle Write

在Spark的早期版本实现中,Spark在每一个MapTask中为每个ReduceTask创建一个bucket,并将RDD计算结果放进bucket中。

但早期的Shuffle Write有两个比较大的问题。

1)Map的输出必须先全部存储到内存中,然后写入磁盘。这对内存是非常大的开销,当内存不足以存储所有的Map输出时就会出现OOM(Out of Memory)。

2)每个MapTask会产生与ReduceTask数量一致的Shuffle文件,如果MapTask个数是1k, ReduceTask个数也是1k,就会产生1M个Shuffle文件。这对于文件系统是比较大的压力,同时在Shuffle数据量不大而Shuffle文件又非常多的情况下,随机写也会严重降低IO的性能。

后来到了Spark 0.8版实现时,显著减少了Shuffle的内存压力,现在Map输出不需要先全部存储在内存中,再flush到硬盘,而是record-by-record写入磁盘中。对于Shuffle文件的管理也独立出新的ShuffleBlockManager进行管理,而不是与RDD cache文件在一起了。

但是Spark 0.8版的Shuffle Write仍然有两个大的问题没有解决。

1)Shuffle文件过多的问题。这会导致文件系统的压力过大并降低IO的吞吐量。

2)虽然Map输出数据不再需要预先存储在内存中然后写入磁盘,从而显著减少了内存压力。但是新引入的DiskObjectWriter所带来的buffer开销也是不容小视的内存开销。假定有1k个MapTask和1k个ReduceTask,就会有1M个bucket,相应地就会有1M个write handler,而每一个write handler默认需要100KB内存,那么总共需要100GB内存。这样仅仅是buffer就需要这么多的内存。因此当ReduceTask数量很多时,内存开销会很大。

为了解决shuffle文件过多的情况,Spark后来引入了新的Shuffle consolidation,以期显著减少Shuffle文件的数量。

Shuffle consolidation的原理如图3-12所示:

[插图]

图3-12 Shuffl e consolidation

在图3-12中,假定该job有4个Mapper和4个Reducer,有2个core能并行运行两个task。可以算出Spark的Shuffle Write共需要16个bucket,也就有了16个write handler。在之前的Spark版本中,每个bucket对应一个文件,因此在这里会产生16个shuffle文件。

而在Shuffle consolidation中,每个bucket并非对应一个文件,而是对应文件中的一个segment。同时Shuffle consolidation产生的Shuffle文件数量与Spark core的个数也有关系。在图3-12中,job中的4个Mapper分为两批运行,在第一批2个Mapper运行时会申请8个bucket,产生8个Shuffle文件;而在第二批Mapper运行时,申请的8个bucket并不会再产生8个新的文件,而是追加写到之前的8个文件后面,这样一共就只有8个Shuffle文件,而在文件内部共有16个不同的segment。因此从理论上讲Shuffle consolidation产生的Shuffle文件数量为C×R,其中C是Spark集群的core number, R是Reducer的个数。

很显然,当M=C时,Shuffle consolidation产生的文件数和之前的实现相同。

Shuffle consolidation显著减少了Shuffle文件的数量,解决了Spark之前实现中一个比较严重的问题。但是Writer handler的buffer开销过大依然没有减少,若要减少Writer handler的buffer开销,只能减少Reducer的数量,但是这又会引入新的问题。

2. Shuffle Fetch与Aggregator

Shuffle Write写出去的数据要被Reducer使用,就需要Shuffle Fetch将所需的数据Fetch过来。这里的Fetch操作包括本地和远端,因为Shuffle数据有可能一部分是存储在本地的。在早期版本中,Spark对Shuffle Fetcher实现了两套不同的框架:NIO通过socket连接Fetch数据;OIO通过netty server去fetch数据。分别对应的类是Basic-BlockFetcherIterator和NettyBlockFetcherIterator。

目前在Spark1.5.0中做了优化。新版本定义了类ShuffleBlockFetcherIterator来完成数据的fetch。对于local的数据,ShuffleBlockFetcherIterator会通过local的BlockMan-ager来fetch。对于远端的数据块,它通过BlockTransferService类来完成。具体实现参见如下代码:

              [ShuffleBlockFetcherIterator.scala]

/* fetch local数据块 */

private[this] def fetchLocalBlocks() {

在MapReduce的Shuffle过程中,Shuffle fetch过来的数据会进行归并排序(merge sort),使得相同key下的不同value按序归并到一起供Reducer使用,这个过程如图3-13所示:

[插图]

图3-13 Fetch merge

这些归并排序都是在磁盘上进行的,这样做虽然有效地控制了内存使用,但磁盘IO却大幅增加了。虽然Spark属于MapReduce体系,但是对传统的MapReduce算法进行了一定的改变。Spark假定在大多数应用场景下,Shuffle数据的排序不是必须的,如word count。强制进行排序只会使性能变差,因此Spark并不在Reducer端做归并排序。既然没有归并排序,那Spark是如何进行reduce的呢?这就涉及下面要讲的Shuffle Aggregator了。

Aggregator本质上是一个hashmap,它是以map output的key为key,以任意所要combine的类型为value的hashmap。

在做word count reduce计算count值时,它会将Shuffle fetch到的每一个key-value对更新或是插入hashmap中(若在hashmap中没有查找到,则插入其中;若查找到,则更新value值)。这样就不需要预先把所有的key-value进行merge sort,而是来一个处理一个,省去了外部排序这一步骤。但同时需要注意的是,reducer的内存必须足以存放这个partition的所有key和count值,因此对内存有一定的要求。

在上面word count的例子中,因为value会不断地更新,而不需要将其全部记录在内存中,因此内存的使用还是比较少的。考虑一下如果是groupByKey这样的操作,Reducer需要得到key对应的所有value。在Hadoop MapReduce中,由于有了归并排序,因此给予Reducer的数据已经是group by key了,而Spark没有这一步,因此需要将key和对应的value全部存放在hashmap中,并将value合并成一个array。可以想象为了能够存放所有数据,用户必须确保每一个partition小到内存能够容纳,这对于内存是非常严峻的考验。因此在Spark文档中,建议用户涉及这类操作时尽量增加partition,也就是增加Mapper和Reducer的数量。

增加Mapper和Reducer的数量固然可以减小partition的大小,使内存可以容纳这个partition。但是在Shuffle write中提到,bucket和对应于bucket的write handler是由Mapper和Reducer的数量决定的,task越多,bucket就会增加得更多,由此带来write handler所需的buffer也会更多。在一方面我们为了减少内存的使用采取了增加task数量的策略,另一方面task数量增多又会带来buffer开销更大的问题,因此陷入了内存使用的两难境地。

为了减少内存的使用,只能将Aggregator的操作从内存移到磁盘上进行,因此Spark新版本中提供了外部排序的实现,以解决这个问题。

Spark将需要聚集的数据分为两类:不需要归并排序和需要归并排序的数据。对于前者,在内存中的AppendOnlyMap中对数据聚集。对于需要归并排序的数据,现在内存中进行聚集,当内存数据达到阈值时,将数据排序后写入磁盘。事实上,磁盘上的数据只是全部数据的一部分,最后将磁盘数据全部进行归并排序和聚集。具体Aggregator的逻辑可以参见Aggregator类的实现。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Albert陈凯

4.0Spark编程模型RDD

Spark核心技术与高级应用 第4章 编程模型 不自见,故明;不自是,故彰;不自伐,故有功;不自矜,故能长。 ——《道德经》第二十二章 在面对自我的问题上,不...

3099
来自专栏悦思悦读

Spark为什么比Hadoop快那么多?

在2014年11月5日举行的Daytona Gray Sort 100TB Benchmark竞赛中,Databricks 用构建于206个运算节点之上的spa...

53611
来自专栏about云

spark零基础学习线路指导【包括spark2】

问题导读 1.你认为spark该如何入门? 2.你认为spark入门编程需要哪些步骤? 3.本文介绍了spark哪些编程知识?

1083
来自专栏Albert陈凯

Spark详解06容错机制Cache 和 Checkpoint Cache 和 Checkpoint

Cache 和 Checkpoint 作为区别于 Hadoop 的一个重要 feature,cache 机制保证了需要访问重复数据的应用(如迭代型算法和交互式应...

49412
来自专栏灯塔大数据

每周学点大数据 | No.73 在 HDFS 上使用 Spark

编者按:灯塔大数据将每周持续推出《从零开始学大数据算法》的连载,本书为哈尔滨工业大学著名教授王宏志老师的扛鼎力作,以对话的形式深入浅出的从何为大数据说到大数据算...

3777
来自专栏Albert陈凯

2.0Spark编程模型

循序渐进学Saprk 与Hadoop相比,Spark最初为提升性能而诞生。Spark是Hadoop MapReduce的演化和改进,并兼容了一些数据库的基本思想...

3918
来自专栏大数据架构

Adaptive Execution 让 Spark SQL 更高效更智能

前面《Spark SQL / Catalyst 内部原理 与 RBO》与《Spark SQL 性能优化再进一步 CBO 基于代价的优化》介绍的优化,从查询本身与...

1281
来自专栏肖力涛的专栏

Spark 踩坑记:从 RDD 看集群调度

本文的思路是从spark最细节的本质,即核心的数据结构RDD出发,到整个Spark集群宏观的调度过程做一个整理归纳,从微观到宏观两方面总结,方便自己在调优过程中...

1.1K2
来自专栏xingoo, 一个梦想做发明家的程序员

Structured Streaming教程(1) —— 基本概念与使用

在有过1.6的streaming和2.x的streaming开发体验之后,再来使用Structured Streaming会有一种完全不同的体验,尤其是在代码设...

1511
来自专栏Java技术栈

Tomcat集群session复制与Oracle的坑。。

问题描述 公司某个系统使用了tomcat自带的集群session复制功能,然后后报了一个oracle驱动包里面的连接不能被序列化的异常。 01-Nov-2017...

3869

扫码关注云+社区

领取腾讯云代金券