前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Mapreduce shuffle详解

Mapreduce shuffle详解

作者头像
Spark学习技巧
发布2018-03-20 15:09:20
1.3K0
发布2018-03-20 15:09:20
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

Mapreduce shuffle详解

Mapreduce确保每个reducer的的输入都是按键排序的。系统执行排序的过程(即将map输出作为输入 传给reducer)成为shuffle。从多个方面来看shuffle是mapreduce的心脏,是奇迹发生的地方。

上图展示了,mapreduce的详细过程。

1 输入分片

对于数据的输入分片,要根据不同的存储格式有不同的介绍。对于,hdfs存储的文件,数据的分片就可分为两种,文件可切分(不压缩或者压缩格式bzip2)的按照一定大小进行分片有既定算法,默认是block的大小,具体算法不在这里细讲,前面hive调优的文章又说到,而且浪尖也会在后续的文章提到这个内容;

分片的时候计算公式计算过程举例

文件不可切分则一个文件一个分片。

2 Map端

从上图我们可以看到map端的处理过程。Map会读取输入分片数据。但是map函数开始产生输出时并不是简单的将数据写入磁盘。这个过程很复杂,他利用了缓冲的方式写到内存并出于效率的考虑进行排序。

每个map任务都是有一个环形缓冲区的用于存储任务的输出。在默认情况下,缓冲区的大小为100MB,辞职可以通过改变io.sort.mb来调整。一旦缓冲内容达到阈值(io.sort,spill,percent,默认是0.8),一个后台线程会将内容spill到磁盘。在spill到磁盘的过程中,map输出并不会停止往缓冲区写入数据,但如果在此期间缓冲区被写满,map会被阻塞知道写磁盘过程完成。

溢出写过程安装轮询方式将缓冲区的内容写到mapred.local.dir指定的作业特定子目录中的目录中。

写磁盘之前,线程首先根据数据最终要传的reducer把数据划分成相应的分区。在每个分区中后台线程按键进行内排序,如果有一个combiner,它就在排序后的输出上运行。运行combinner使得map输出结果更紧凑,因此可以减少写到磁盘的数据和传递给reducer的数据。

每次内存缓冲区达到溢出阈值,就会新建一个溢出文件(spill file),因此在map任务写完其最后一个输出记录之后,会有几个溢写文件。在任务完成之前,溢出文件被合并成一个已分区且已排序的输出文件。配置属性io.sort.factor控制着一次最多能合并多少流,默认是10。

如果至少存在3个溢出文件(通过min.num.spills.for.combine属性设置)时,则combiner就会在输出文件写到磁盘之前再次运行。前面曾说过,combiner可以在输入上反复运行,单不影响最终的结果。如果只有一两个溢出文件,那么对map输出的减少不值得调用combiner,就不会为map输出再次运行combiner。

在将压缩map输出写到磁盘的过程中对它进行压缩往往是个好主意,因为这样就会写磁盘的速度更快,更加节约时间,并且减少传给reducer的数据量。在默认情况下,输出是不压缩的,但是只要将mapred.compress.map.output设置为true,就可以启用这个功能。使用的压缩库由mapred.map.output.compression.codec指定。

Reducer是通过HTTP的方式得到输出文件的分区。在MRV2中使用netty进行数据传输,默认情况下netty的工作线程数是处理器数的2倍。MRV1中,默认值是40,由tracker.http.threads来在tasktracker端设定。

3 Reducer端

集群中往往一个mr任务会有若干map任务和reduce任务,map任务运行有快有慢,reduce不可能等到所有的map任务都运行结束再启动,因此只要有一个任务完成,reduce任务就开始复制器输出。复制线程的数量由mapred.reduce.parallel.copies属性来改变,默认是 5。

Reducer如何知道map输出的呢?对于MRv2 map运行结束之后直接就通知了appmaster,对于给定的job appmaster是知道map的输出和host之间的关系。在reduce端获取所有的map输出之前,Reduce端的线程会周期性的询问master 关于map的输出。Reduce并不会在获取到map输出之后就立即删除hosts,因为reduce有肯能运行失败。相反,是等待appmaster的删除消息来决定删除host。

Reduce对map输出的不同大小也有相应的调优处理。如果map输出相当小,会被复制到reduce任务JVM的内存(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,指定用于此用途的堆空间的百分比),否则,map输出会被复制到磁盘。一旦内存缓冲区达到阈值(由mapred.job.shuffle.merge.percent决定)或达到map的输出阈值(mapred.inmem.merge,threshold控制),则合并后溢出写到磁盘中。如果指定combiner,则在合并期间运行它已降低写入磁盘的数据量。

随着磁盘上副本的增多,后台线程会将它们合并为更大的,排序好的文件。这会为后面的合并节省一些时间。注意,为了合并,压缩的map输出(通过map任务)都必须在内存中解压缩。

复制完所有的map输出后,reduce任务进入排序阶段(更加恰当的说法是合并阶段,因为排序是在map端进行的),这个阶段将合并map的输出,维持其顺序排序。这是循环进行的。比如,有50个map输出,而合并因子是10(默认值是10,由io.sort.factor属性设置,与map的合并类似),合并将进行5趟。每趟将10个文件合并成一个文件,因此最后有5个中间文件。

在最后阶段,即reduce阶段,直接把数据输入reduce函数,从而省略了一次磁盘往返行程,并没有将这5个文件合并成一个已排序的文件最为最后一趟。最后的合并可以来自内存和磁盘片段。

在reduce阶段,对已排序输出中的每个键调用reduce函数。此阶段的输出直接写到输出文件系统,一般为hdfs。

注意:

每趟合并的文件数实际上比上面例子中展示的有所不同的。目标是合并最小数据量的文件以便满足最后一趟的合并系数。因此,如果有40个文件,我们不会再四趟中每趟合并10个文件而得到4个文件。相反,第一趟只合并4个文件,随后的三塘合并10个文件。最后一趟中,4个已经合并的文件和剩余的6个文件合计是个文件进行合并。如下图所述:

注意这并没有改变合并的次数,它只是一个优化措施,目的是尽量减少写到磁盘的数据量,因为最后一趟总是直接合并到reduce。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-02-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档