前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Shuffle机制

Spark Shuffle机制

作者头像
伊泽瑞尔
发布2022-06-01 08:38:37
5610
发布2022-06-01 08:38:37
举报

一、Shuffle机制

在MapReduce框架中,Shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过Shuffle这个环节,Shuffle的性能高低直接影响了整个程序的性能和吞吐量。Spark作为MapReduce框架的一种实现,自然也实现了Shuffle的逻辑。对于大数据计算框架而言,Shuffle阶段的效率是决定性能好坏的关键因素之一。

二、什么是Shuffle

Shuffle是MapReduce框架中的一个特定的阶段,介于Map阶段和Reduce阶段之间,当Map的输出结果要被Reduce使用时,输出结果需要按关键字值(key)哈希,并且分发到每一个Reducer上,这个过程就是Shuffle。直观来讲,Spark Shuffle机制是将一组无规则的数据转换为一组具有一定规则数据的过程。由于Shuffle涉及了磁盘的读写和网络的传输,因此Shuffle性能的高低直接影响整个程序的运行效率。

在MapReduce计算框架中,Shuffle连接了Map阶段和Reduce阶段,即每个Reduce Task从每个Map Task产生的数据中读取一片数据,极限情况下可能触发M*R个数据拷贝通道(M是Map Task数目,R是Reduce Task数目)。通常Shuffle分为两部分:Map阶段的数据准备和Reduce阶段的数据拷贝。

首先,Map阶段需根据Reduce阶段的Task数量决定每个Map Task输出的数据分片数目,有多种方式存放这些数据分片:

  • 保存在内存中或者磁盘上(Spark和MapReduce都存放在磁盘上)。
  • 每个分片对应一个文件(现在Spark采用的方式,以前MapReduce采用的方式),或者所有分片放到一个数据文件中,外加一个索引文件记录每个分片在数据文件中的偏移量(现在MapReduce采用的方式)。

因此可以认为Spark Shuffle与Mapreduce Shuffle的设计思想相同,但在实现细节和优化方式上不同。

spark的shuffleManager是负责shuffle过程的执行、计算和处理的组件。shuffleManager是trait,Spark中有两种Shuffle管理类型,HashShuffleManager和SortShuffleManager,Spark1.2之前是HashShuffleManager,Spark1.2引入SortShuffleManager,在Spark2.0+版本中已经将HashShuffleManager丢弃。

三、HashShuffleManager

shuffle write阶段,默认Mapper阶段会为Reducer阶段的每一个Task单独创建一个文件来保存该Task中要使用的数据。

这种方式操作数据简单,但是在一些情况下(例如数据量非常大的情况)会造成大量文件(M*R,其中M代表Mapper中的所有的并行任务数量,R代表Reducer中所有的并行任务数据)大数据的随机磁盘I/O操作且会形成大量的Memory(极易造成OOM)。所以产生了以下两个问题:

  • 不能够处理大规模的数据。
  • Spark不能够运行在大规模的分布式集群上。

改进方案:通过设置spark.shuffle.consolidateFiles 该参数默认值为false,将其设置为true即可开启优化的Consolidate机制。

Consolidate机制来将Shuffle时候产生的文件数量减少到C*R个(C代表在Mapper端,同时能够使用的cores数量,R代表Reducer中所有的并行任务数量)。但是此时如果Reducer端的并行数据分片过多的话则C*R可能已经过大,也有打开过多文件的问题。Consolidate并没有降低并行度,只是降低了临时文件的数量,此时Mapper端的内存消耗就会变少,所以OOM也就会降低,另外一方面磁盘的性能也会变得更好。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

从理论上讲Shuffle consolidation产生的Shuffle文件数量为C×R,其中C是Spark集群的core number, R是Reducer的个数。

四、SortShuffleManager

HashShuffle写数据的时候,内存有一个bucket缓冲区,同时在本地磁盘有对应的本地文件,如果本地有文件,那么在内存应该也有文件句柄也是需要耗费内存的。也就是说,从内存的角度考虑,即有一部分存储数据,一部分管理文件句柄。如果Mapper分片数量为1000,Reduce分片数量为1000,那么总共就需要1000000个小文件。所以就会有很多内存消耗,频繁IO以及GC频繁或者出现内存溢出。而且Reducer端读取Map端数据时,Mapper有这么多小文件,就需要打开很多网络通道读取,很容易造成Reducer(下一个stage)通过driver去拉取上一个stage数据的时候,说文件找不到,其实不是文件找不到而是程序不响应,因为正在GC。

为了缓解Shuffle过程产生文件数过多和Writer缓存开销过大的问题,spark引入了类似于hadoop Map-Reduce的shuffle机制。该机制每一个ShuffleMapTask不会为后续的任务创建单独的文件,而是会将所有的Task结果写入同一个文件,并且对应生成一个索引文件。以前的数据是放在内存缓存中,等到数据完了再刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少Writer缓存所占用的内存大小,而且同时避免GC的风险和频率。

SortShuffle的运行机制主要分成两种:普通运行机制和bypass运行机制。

普通运行机制:

执行流程:

  1. map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是5M。
  2. 在shuffle的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过5M时,比如现在内存结构中的数据为5.01M,那么他会申请5.01*2-5=5.02M内存给内存数据结构。
  3. 如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。
  4. 在溢写之前内存结构中的数据会进行排序分区。
  5. 然后开始溢写磁盘,写磁盘是以batch的形式去写,一个batch是1万条数据。
  6. map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件。
  7. reduce task去map端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。

总结:这种机制产生磁盘小文件的个数:2*M(map task的个数)。

byPass运行机制:

bypass运行机制的触发条件如下:

1.shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认是200)。

这个参数仅适用于SortShuffleManager,如前所述,SortShuffleManager在处理不需要排序的Shuffle操作时,由于排序带来性能的下降。这个参数决定了在这种情况下,当Reduce分区的数量小于多少的时候,在SortShuffleManager内部不使用Merge Sort的方式处理数据,而是与Hash Shuffle类似,直接将分区文件写入单独的文件,不同的是,在最后一步还是会将这些文件合并成一个单独的文件。这样通过去除Sort步骤来加快处理速度,代价是需要并发打开多个文件,所以内存消耗量增加,本质上是相对HashShuffleMananger一个折衷方案。这个参数的默认值是200个分区,如果内存GC问题严重,可以降低这个值。

2.不是聚合类的shuffle算子(比如reduceByKey)。

此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

总结:这种机制产生的磁盘的小文件为:2*M(map task的个数)。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-11-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据与知识图谱 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Shuffle机制
  • 二、什么是Shuffle
  • 三、HashShuffleManager
  • 四、SortShuffleManager
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档