Mapreduce shuffle详解

Mapreduce shuffle详解

Mapreduce确保每个reducer的的输入都是按键排序的。系统执行排序的过程(即将map输出作为输入 传给reducer)成为shuffle。从多个方面来看shuffle是mapreduce的心脏,是奇迹发生的地方。

上图展示了,mapreduce的详细过程。

1 输入分片

对于数据的输入分片,要根据不同的存储格式有不同的介绍。对于,hdfs存储的文件,数据的分片就可分为两种,文件可切分(不压缩或者压缩格式bzip2)的按照一定大小进行分片有既定算法,默认是block的大小,具体算法不在这里细讲,前面hive调优的文章又说到,而且浪尖也会在后续的文章提到这个内容;

分片的时候计算公式计算过程举例

文件不可切分则一个文件一个分片。

2 Map端

从上图我们可以看到map端的处理过程。Map会读取输入分片数据。但是map函数开始产生输出时并不是简单的将数据写入磁盘。这个过程很复杂,他利用了缓冲的方式写到内存并出于效率的考虑进行排序。

每个map任务都是有一个环形缓冲区的用于存储任务的输出。在默认情况下,缓冲区的大小为100MB,辞职可以通过改变io.sort.mb来调整。一旦缓冲内容达到阈值(io.sort,spill,percent,默认是0.8),一个后台线程会将内容spill到磁盘。在spill到磁盘的过程中,map输出并不会停止往缓冲区写入数据,但如果在此期间缓冲区被写满,map会被阻塞知道写磁盘过程完成。

溢出写过程安装轮询方式将缓冲区的内容写到mapred.local.dir指定的作业特定子目录中的目录中。

写磁盘之前,线程首先根据数据最终要传的reducer把数据划分成相应的分区。在每个分区中后台线程按键进行内排序,如果有一个combiner,它就在排序后的输出上运行。运行combinner使得map输出结果更紧凑,因此可以减少写到磁盘的数据和传递给reducer的数据。

每次内存缓冲区达到溢出阈值,就会新建一个溢出文件(spill file),因此在map任务写完其最后一个输出记录之后,会有几个溢写文件。在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件。配置属性io.sort.factor控制着一次最多能合并多少流,默认是10。

如果至少存在3个溢出文件(通过min.num.spills.for.combine属性设置)时,则combiner就会在输出文件写到磁盘之前再次运行。前面曾说过,combiner可以在输入上反复运行,单不影响最终的结果。如果只有一两个溢出文件,那么对map输出的减少不值得调用combiner,就不会为map输出再次运行combiner。

在将压缩map输出写到磁盘的过程中对它进行压缩往往是个好主意,因为这样就会写磁盘的速度更快,更加节约时间,并且减少传给reducer的数据量。在默认情况下,输出是不压缩的,但是只要将mapred.compress.map.output设置为true,就可以启用这个功能。使用的压缩库由mapred.map.output.compression.codec指定。

Reducer是通过HTTP的方式得到输出文件的分区。在MRV2中使用netty进行数据传输,默认情况下netty的工作线程数是处理器数的2倍。MRV1中,默认值是40,由tracker.http.threads来在tasktracker端设定。

3 Reducer端

集群中往往一个mr任务会有若干map任务和reduce任务,map任务运行有快有慢,reduce不可能等到所有的map任务都运行结束再启动,因此只要有一个任务完成,reduce任务就开始复制器输出。复制线程的数量由mapred.reduce.parallel.copies属性来改变,默认是 5。

Reducer如何知道map输出的呢?对于MRv2 map运行结束之后直接就通知了appmaster,对于给定的job appmaster是知道map的输出和host之间的关系。在reduce端获取所有的map输出之前,Reduce端的线程会周期性的询问master 关于map的输出。Reduce并不会在获取到map输出之后就立即删除hosts,因为reduce有肯能运行失败。相反,是等待appmaster的删除消息来决定删除host。

Reduce对map输出的不同大小也有相应的调优处理。如果map输出相当小,会被复制到reduce任务JVM的内存(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,指定用于此用途的堆空间的百分比),否则,map输出会被复制到磁盘。一旦内存缓冲区达到阈值(由mapred.job.shuffle.merge.percent决定)或达到map的输出阈值(mapred.inmem.merge,threshold控制),则合并后溢出写到磁盘中。如果指定combiner,则在合并期间运行它已降低写入磁盘的数据量。

随着磁盘上副本的增多,后台线程会将它们合并为更大的,排序好的文件。这会为后面的合并节省一些时间。注意,为了合并,压缩的map输出(通过map任务)都必须在内存中解压缩。

复制完所有的map输出后,reduce任务进入排序阶段(更加恰当的说法是合并阶段,因为排序是在map端进行的),这个阶段将合并map的输出,维持其顺序排序。这是循环进行的。比如,有50个map输出,而合并因子是10(默认值是10,由io.sort.factor属性设置,与map的合并类似),合并将进行5趟。每趟将10个文件合并成一个文件,因此最后有5个中间文件。

在最后阶段,即reduce阶段,直接把数据输入reduce函数,从而省略了一次磁盘往返行程,并没有将这5个文件合并成一个已排序的文件最为最后一趟。最后的合并可以来自内存和磁盘片段。

在reduce阶段,对已排序输出中的每个键调用reduce函数。此阶段的输出直接写到输出文件系统,一般为hdfs。

注意:

每趟合并的文件数实际上比上面例子中展示的有所不同的。目标是合并最小数据量的文件以便满足最后一趟的合并系数。因此,如果有40个文件,我们不会再四趟中每趟合并10个文件而得到4个文件。相反,第一趟只合并4个文件,随后的三塘合并10个文件。最后一趟中,4个已经合并的文件和剩余的6个文件合计是个文件进行合并。如下图所述:

注意这并没有改变合并的次数,它只是一个优化措施,目的是尽量减少写到磁盘的数据量,因为最后一趟总是直接合并到reduce。

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-02-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏逆向技术

远程线程注入

一丶远程线程注入的讲解 远程线程注入的原理,我会写一个远程线程开发的例子 我们总共需要几步 /*1.查找窗口,获取窗口句柄*/ /*2.根据...

266100
来自专栏张善友的专栏

.net 2.0 你是如何使用事务处理?

     事务处理作为企业级开发必备的基础设施, .net 2.0通过System.Transactions对事务提供强大的支持.你还是在使用.net 1.x下...

22760
来自专栏牛肉圆粉不加葱

Spark Task 的执行流程② - 创建、分发 Task

task 的创建本应该放在分配 tasks 给 executors一文中进行介绍,但由于创建的过程与分发及之后的反序列化执行关系紧密,我把这一部分内容挪到了本文...

9810
来自专栏用户2442861的专栏

修改npm全局安装模式的路径

刚学nodeJS不久,很纳闷为什么全局安装的模块在 'node安装目录/node_modules‘ 中没找到!后来仔细看了下安装成功后的信息,才发现原来是自动安...

11220
来自专栏逸鹏说道

Owin:“System.Reflection.TargetInvocationException”类型的未经处理的异常

异常汇总:http://www.cnblogs.com/dunitian/p/4523006.html#signalR Owin:“System.Reflect...

40050
来自专栏gaoqin31

设计模式之 策略模式

定义 :封装了一些列算法,它们之前可以相互替换,此模式使得算法的改变,不会影响到使用它们的客户端

13530
来自专栏塔奇克马敲代码

第 8 章 IO库

20450
来自专栏同步博客

Memcache存储机制与指令汇总

  memcached是高性能的分布式内存缓存服务器。一般的使用目的是,通过缓存数据库查询结果,减少数据库访问次数,以提高动态Web应用的速度、提高可扩展性。

10320
来自专栏生信技能树

把rstudio的project或者package同步到自己的GitHub

然后rstudio的git/svn需要设置好秘钥连接到自己的GitHub, 参考生信菜鸟团博客教程:http://www.bio-info-trainee.co...

11930
来自专栏决胜机器学习

数据库专题(五) ——Memcached技术

数据库专题(五)——Memcached技术 (原创内容,转载请注明来源,谢谢) 一、Slab分配算法保存数据 Memcached默认只能用1M...

28750

扫码关注云+社区

领取腾讯云代金券