前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark2.x学习笔记:12、Shuffle机制

Spark2.x学习笔记:12、Shuffle机制

作者头像
程裕强
发布2018-01-02 16:43:45
1.1K1
发布2018-01-02 16:43:45
举报

12、Shuffle机制

12.1 背景

在MapReduce计算框架中,shuffle是连接Map和Reduce之间的桥梁。

Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。

Shuffle的本义是洗牌、混洗,把一组有一定规则的数据尽量转换成一组无规则的数据,越随机越好。MapReduce中的Shuffle更像是洗牌的逆过程,把一组无规则的数据尽量转换成一组具有一定规则的数据。

为什么MapReduce计算模型需要Shuffle过程?我们都知道MapReduce计算模型一般包括两个重要的阶段:Map是映射,负责数据的过滤分发;Reduce是规约,负责数据的计算归并。Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过Shuffle来获取数据。

12.2 Spark Shuffle

Spark作为MapReduce框架的一种实现,自然也实现了shuffle的逻辑。 为什么Spark计算框架也需要Shuffle机制呢?我们知道,Spark计算框架是在分布式环境下运行,这就意味着不可能在单个进程空间中容纳所有计算需要的数据。数据需要按照key进行分区,然后打散分布到集群的各个进程的存储空间中。这里存在一个问题,不存在唯一的分区方式 满足所有的算子计算要求。比如对数据进行排序时,就需要重新按照一定规则对数据重新分区。Shuffle就是包裹在各种需要重新分区的算子之下的一个对数据集重新分区组合的过程。 由于重新分区需要知道分区规则,而分区规则按照数据的Key通过映射函数进行划分,由数据确定出key的过程就是Map过程。同时Map过程也可以做数据处理,例如join算法中一种经典算法Map Side Join,就是确定数据该放到哪个分区。Shuffle将数据进行收集分配到指定的Reduce分区

每个Reduce Task需要从每个MapTask读取一部分数据,则网络连接数是:M*R,其中M是MapTask数,R是Reduce Task数。也就是Shuffle产生文件数M*N。所以说Shuffle是分布式计算框架的核心数据交换方式,其实现方式直接决定了计算框架的性能和扩展性。

产生Shuffle的算子:join,cogroup,各类ByKey(reduceByKey,groupByKey,sortByKey)。

在Spark中,每个Action算子(Action算子才是真正执行)都转换为一个Job,每个Job转换为一个有向无环图DAG来执行。DAG在每个Stage承接阶段做Shuffle过程,也就是说根据Shuffle划分了各个Stage。

Spark Shuffle可以分为两个阶段:Shuffle Write阶段和Shuffle Read阶段(也称为Shuffle Fetch ,包含了Aggregate)。

(1) Shuffle Write阶段 为了分析方便,假定每个Executor只有1个CPU core,也就是说,无论这个Executor上分配多少个task线程,同一时间都只能执行一个task线程。

shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。参见下面HashShuffleManager图示。

(2) shuffle read阶段 shuffle read,通常就是一个stage刚开始时要做的事情。此时该stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task为下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。以此类推,直到最后将所有数据到拉取完,并得到最终的结果。

12.3 ShuffleManager

在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。 (1)HashShuffleManager 在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

这里写图片描述
这里写图片描述

那么每个执行shuffle write的task,要为下一个stage创建多少个磁盘文件呢?很简单,当前stage的task数乘以下一个stage的task数,也就是上面说的M*R。

(2)SortShuffleManager 因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

这里写图片描述
这里写图片描述

那么使用SortShuffleManager的Shuffle文件数是Core*R,Core是CPU核心数,我们可以认为Core是常量。这样Shuffle文件数大大减少。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017-10-07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 12、Shuffle机制
    • 12.1 背景
      • 12.2 Spark Shuffle
        • 12.3 ShuffleManager
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档