Spark2.x学习笔记:12、Shuffle机制

12、Shuffle机制

12.1 背景

在MapReduce计算框架中,shuffle是连接Map和Reduce之间的桥梁。

Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。

Shuffle的本义是洗牌、混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。

为什么MapReduce计算模型需要Shuffle过程?我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过Shuffle来获取数据。

12.2 Spark Shuffle

Spark作为MapReduce框架的一种实现,自然也实现了shuffle的逻辑。 为什么Spark计算框架也需要Shuffle机制呢?我们知道,Spark计算框架是在分布式环境下运行,这就意味着不可能在单个进程空间中容纳所有计算需要的数据。数据需要按照key进行分区,然后打散分布到集群的各个进程的存储空间中。这里存在一个问题,不存在唯一的分区方式 满足所有的算子计算要求。比如对数据进行排序时,就需要重新按照一定规则对数据重新分区。Shuffle就是包裹在各种需要重新分区的算子之下的一个对数据集重新分区组合的过程。 由于重新分区需要知道分区规则,而分区规则按照数据的Key通过映射函数进行划分,由数据确定出key的过程就是Map过程。同时Map过程也可以做数据处理,例如join算法中一种经典算法Map Side Join,就是确定数据该放到哪个分区。Shuffle将数据进行收集分配到指定的Reduce分区

每个Reduce Task需要从每个MapTask读取一部分数据,则网络连接数是:M*R,其中M是MapTask数,R是Reduce Task数。也就是Shuffle产生文件数M*N。所以说Shuffle是分布式计算框架的核心数据交换方式,其实现方式直接决定了计算框架的性能和扩展性。

产生Shuffle的算子:join,cogroup,各类ByKey(reduceByKey,groupByKey,sortByKey)。

在Spark中,每个Action算子(Action算子才是真正执行)都转换为一个Job,每个Job转换为一个有向无环图DAG来执行。DAG在每个Stage承接阶段做Shuffle过程,也就是说根据Shuffle划分了各个Stage。

Spark Shuffle可以分为两个阶段:Shuffle Write阶段和Shuffle Read阶段(也称为Shuffle Fetch ,包含了Aggregate)。

(1) Shuffle Write阶段 为了分析方便,假定每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。参见下面HashShuffleManager图示。

(2) shuffle read阶段 shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task为下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

12.3 ShuffleManager

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。 (1)HashShuffleManager 在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,当前stage的task数乘以下一个stage的task数,也就是上面说的M*R。

(2)SortShuffleManager 因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

那么使用SortShuffleManager的Shuffle文件数是Core*R,Core是CPU核心数,我们可以认为Core是常量。这样Shuffle文件数大大减少。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

1 条评论
登录 后参与评论

相关文章

来自专栏人工智能LeadAI

TensorFlow遇上Spark

TensorFlowOnSpark 项目是由Yahoo开源的一个软件包,实现TensorFlow集群服务部署在Spark平台之上。 大家好,这次我将分享Tens...

3697
来自专栏吉浦迅科技

DAY62:阅读Glossary

我们正带领大家开始阅读英文的《CUDA C Programming Guide》,今天是第62天,我们正在讲解CUDA C语法,希望在接下来的38天里,您可以学...

803
来自专栏小小挖掘机

Spark作业基本运行原理解析!

我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。提交作业的节点称为Master节点,Driver进程就是...

721
来自专栏蓝天

Linux下select调用引发的血案

Select函数使用简单,其工作原理大家通常也知道,但是在实际的使用过程中可能并没有严格遵守,而且确实也比较难以完全遵守,除非不使用它。

652
来自专栏程序你好

Apache Spark大数据处理 - 性能分析(实例)

今天的任务是将伦敦自行车租赁数据分为两组,周末和工作日。将数据分组到更小的子集进行进一步处理是一种常见的业务需求,我们将看到Spark如何帮助我们完成这项任务。

1013
来自专栏吉浦迅科技

DAY74:阅读Runtime

我们正带领大家开始阅读英文的《CUDA C Programming Guide》,今天是第73天,我们正在讲解CUDA 动态并行,希望在接下来的27天里,您可以...

341
来自专栏Albert陈凯

3.5RDD的容错机制

3.5 RDD的容错机制 RDD实现了基于Lineage的容错机制。RDD的转换关系,构成了compute chain,可以把这个compute chain认...

2718
来自专栏美团技术团队

【技术博客】Spark性能优化指南——高级篇

前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为《Spark性能优化指南》的高级篇,将深入分析数据倾斜调优与shuffle...

3545
来自专栏大魏分享(微信公众号:david-share)

AIX 下磁盘 I/O 性能分析

磁盘 I/O 的概念 I/O的概念,从字义来理解就是输入输出。操作系统从上层到底层,各个层次之间均存在 I/O。比如,CPU 有 I/O,内存有 I/O, VM...

2899
来自专栏美图数据技术团队

Spark Streaming | Spark,从入门到精通

欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,...

361

扫码关注云+社区