专栏首页简单聊聊SparkSpark内核分析之Shuffle操作流程(非常重要)

Spark内核分析之Shuffle操作流程(非常重要)

        如题,我们来分析一下spark的shuffle操作原理;为什么说其非常重要,是因为shuffle操作是我们在Spark调优中非常重要的一环,对shuffle进行了优化,往往可以使得我们的spark程序运行效率有极大的提升。依照惯例,我们先来看一张图;

普通shuffle流程图

上图是一个普通的Shuffle操作流程原理图,一个shuffle操作由三个RDD算子构成,分别是mapPartitionsRDD,ShuffleRDD,mapPartitionsRDD;如上图所示,

1.每个ShuffleMapTask都会为每个ResultTask创建一个Bucket缓存和一个对应的ShuffleBlockFile磁盘文件;2.每个ShuffleMapTask的输出相关信息封装成一个MapStatus发送到DAGScheduler的MapOutputTracker中去; 3.ResultTask开始拉取该任务所需要的数据,ResultTask通过向DAGScheduler的MapOutputTracker获取MapStatus的信息,从而知道自己需要的数据所在的位置,然后去相应的位置拉去数据到该任务所在节点的内存中,如果内存不够,会将部分数据写入磁盘,完成这系列的操作是由ShufflerRDD算子完成的; 4.然后ResultTask对拉取到的数据进行聚合操作,最后生成mapPartitionsRDD算子;

想想上面的这个Shuffler流程会有什么问题?

我们来做一个假设,如果有100个ShuffleMapTask,2个CPU Core,100个ResultTask,那么这个shuffler操作将产生10000个文件,如此多的文件对于Spark作业的性能就是一个灾难;针对这个问题当然有对应的优化策略,接着我们来看另外一张图;

优化的Shuffler流程图

        通过优化的Shuffler操作如上图所示,假设有100个ShufflerMapTask,2CPU core,100个Resulttask,优化后产生的中间文件是200个,是优化之前的1/50;那么这是如何做到的,通过阅读源码可以知道,只要引入consolidation机制就可以实现了,其配置是通过在SparkConf中配置对应的参数即可实现;

        来简单分析一下:一个ShuffleMapTask将数据写入本地不变,但是当这一批ShuffleMapTask运行完成以后,下一批ShuffleMapTask开始运行(一批ShuffleMapTask是指,同一时间有两个Task并行执行,因为有两个CPU Core),它们产生的数据会直接写入上一批ShuffleMapTask产生的本地文件中;上图中左边的一组可以称为一组ShuffleGroup,每个文件中都存储了多个ShuffleMapTask的数据,每个ShuffleMapTask所产生的数据是一个segment,每个File中通过索引,偏移量来标记每部分数据来自不同的ShuffleMapTask。

下面我们来看看源码是如何实现的;

ShuffleMapTask的runTask

1.首先通过Spark全局变量得到shuffleManager对象,并通过shuffleManager对象获得Write对象;

2.接着,通过rdd.iterator方法对属于自己的partition进行计算,最后会调用我们自己编写的RDD算子来计算partition;

3.接着Writer调用自己的write方法将RDD算子计算的结果写入缓存;

HashShuffleWriter的write

1.判断aggregator为true,并且是否设置了map端的combine操作;若成立,则进行map端的数据合并(这里是一个spark优化点,在我之前关于spark优化系列文章中有写过);

2.对所有经过合并操作之后的数据遍历,根据每个元素获得对应的bucketId,然后将改元素写入对应的bucket缓存中;

这里我们来看看这个shuffle对象做了什么?

FileShuffleBlockManager的forMapTask

1.首先创建出一个ShuffleWriterGroup对象;

2.接着判断Spark作业是否设置了consolidateShuffleFiles;如果设置其为true,首先得到一个fileGroup对象,然后使用shuffleId,mapId,BucketId来得到一个blockId,接着根据这个blockId写数据到磁盘的对象;相反,如果没有设置consolidateShuffleFiles为true,则直接为每个shuffleMapTask创建一个blockFile,然后得到一个写数据到磁盘的对象;

3.执行完这里后,接着调用write方法将数据写入内存缓冲bucket,然后再将数据写入磁盘;

写数据到这里就完成了,然后会将产生的数据位置等信息封装成一个MapStatus对象发送给DAGSchedule的MapOutputTracker中;接下来ResultTask开始读取数据;

ShuffleRDD的compute

HashShuffleReader的read

BlockStoreShuffleFetcher的fetch

BlockStoreShuffleFetcher的fetch

总结:到此shuffle的整个操作流程就分析完了,接下来会分析底层数据存储的核心组件BlockManager的工作原理,,欢迎关注。

如需转载,请注明:

本篇:Spark内核分析之Shuffle操作流程(非常重要)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Kafka系列文章第1篇之Kafka是什么

    如果有幸目睹过系统从零到一的演变过程,大家估计都会有一种感叹,就是随着业务复杂度和流量的不断上升,系统变得越来越难以维护,面对高额的维护成本,攻城师们不得不对现...

    z小赵
  • Spark内核分析之Worker原理分析

            接着上篇的Schedule调度内容,本篇我们来看看Driver,Application向Worker发送launch以后到底发生了什么。先来看看...

    z小赵
  • Spark性能调优九之常用算子调优

            前面介绍了很多关于Spark性能的调优手段,今天来介绍一下Spark性能调优的最后一个点,就是关于Spark中常用算子的调优。废话不多说,直接进...

    z小赵
  • 2月27日数据动态早报|互联网银行未来唯一的核心竞争力就是获取数据源的能力

    数据动态,让您了解数据新变化、新创造和新价值。 ? ---- 一、通信行业数据动态 1 广东电信春节大数据出炉 深圳空城指数最高、出境游日本最热。广东电信日...

    陆勤_数据人网
  • 什么是大数据?你需要知道的…..

    我们每天都在吃饭,睡觉,工作,玩耍,与此同时产生大量的数据。根据IBM调研的说法,人类每天生成2.5亿(250亿)字节的数据。 这相当于一堆DVD数据从地球到月...

    华章科技
  • hadoop大数据典型应用,基于Hadoop技术的大数据应用解决方案,湖北大数据平台,数道云

    近年来,Hadoop技术,大数据研发产品在国内迅猛发展,其在不断的发展中解决了传统数据库无法胜任海量数据处理的问题,以及结构化和非结构化数据统一起来进行数据分析...

    数道云大数据
  • 大数据(生于2006,卒于2019)已死!

    由于关注的重心从我们收集数据的方式转向实时处理数据,大数据时代即将终结。大数据现在是支持多云、机器学习和实时分析这几个新时代的业务资产。

    钱塘数据
  • 大数据(生于2006,卒于2019)已死!

    由于关注的重心从我们收集数据的方式转向实时处理数据,大数据时代即将终结。大数据现在是支持多云、机器学习和实时分析这几个新时代的业务资产。

    Spark学习技巧
  • 我们急需三维激光数据的语义分割吗?

    今天给大家分享一篇论文,论文名称: Are We Hungry for 3D LiDAR Data for Semantic Segmentation? 作者...

    用户1150922
  • Docker基础(二)

    数据卷是一个可供容器使用的特殊目录,它将主机操作系统目录直接映射进容器,类似Linux中的mount行为。

    Java阿呆

扫码关注云+社区

领取腾讯云代金券