学习
实践
活动
工具
TVP
写文章

hadoop学习笔记五:MapReduce详解

上面一篇讲到了MapReduce的简单介绍,在处理过程中其实还有一个重要的概念:Shuffle,下面会详细介绍一下MapReduce的处理过程。

处理过程详细划分

直译为洗牌/弄乱;这里面主要的是进行洗牌,也就是对数据的进行归纳合并;表示的是从map task输出到reduce task输入的这段过程。

input -> map() -> shuffle -> reduce() -> output

1)输入分片(input split)

input split,在运行mapreduce程序的时候,需要确定存储在hdfs上的文件被逻辑对的划分为了多少个split,每个split数据让一个map task进行处理。

2)map task任务

在map()中对split输入的数据进行处理

3)shuffle

1.存储

把map()处理后的数据,放入环形内存缓冲区;当环形内存缓冲区达到一定的大小时,通过spill溢写写入到本地磁盘,如果有combine会进行执行

2.合并(merge)与排序(sort)

区分输出到哪个reduce()进行合并文件 --》2)对合并后的数据进行排序 (如果有combine会执行)(进行压缩,编程中可在配置)

以上属于map阶段的shuffle

3.拷贝

从map()输出的数据目录拷贝自己需要的内容到环形内存缓冲区;当环形内存缓冲区达到一定的大小时,写入到本地磁盘。合并后继续排序

4.合并(merge)与排序(sort)

拷贝过来的文件内容进行合并与排序

5.分组(grouping)**** 重要

将相同key的value进行合并,value是一个集合类型,并输出到reduce()

以上属于reduce阶段的shuffle

4)reduce task任务

在reduce()中对shuffle阶段

5)输出到hdfs中

1.输入分片(InputFormat)

了解输入分片的时候需要先了解HDFS上文件的存储:是通过块(Block)进行存储的,每个Block的默认大小是128M。

为了保证MapTask处理数据时的任务本地性,MapTask会运行在所处理数据的这台服务器上;由MRAppMaster来进行决定:首先获取MapTask所处理数据的Block的存储位置,然后判断Block的机器是否有足够的资源运行MapTask任务,最后将MapTask运行在Block所在的机器上,如果没有那么会运行在与block同一机架的服务器上。

分片:

读取HDFS文件数据后会根据文件的大小来获取分片的个数,每个分片对应的是一个Mapper;即分片是决定处理这个文件会产生多少个Mapper。

计算方法:

1.文件的个数;

2.文件的大小;

每个文件默认会切割为1个分片,每个文件存储的block也会切割为1个分片。在FileInputFormat源码的getSplits()方法:

我们可以看到,minSize,maxSize和blockSize。其中minSize是根据配置文件的mapreduce.input.fileinputformat.split.minsize参数(默认参数值为0)和FileIntputFormat默认的minSize(值为1)的最小值来决定的;maxSize是根据配置文件的mapreduce.input.fileinputformat.split.maxsize参数(默认没有这个参数会取Long.MAX_VALUE)来决定的;blockSize文件存储block的数量。计算方法:maxSize 和 blockSize 的最小值 和 minSize 的比较取最大值。例如:3个文件大小分别为 50m、150m、256m,那么就是第一个文件1个分片,第二个文件大于128*1 小于128*2 那么就是2个分片,第三个文件等于128*2 那么就是2个分片,总共为1+2+2=5个分片,即生成5个Mapper来处理。

InputFormat抽象类的主要方法 getSplits 获取分片的信息 和 createRecordReader 读取

2.Mapper阶段

用户自定义的处理数据逻辑,根据输入的数据进行业务处理后进行输出,输出之后进行Shuffle处理。对于 MapReduce程序来说,默认的时候,Mapper输出的Key/Value数据类型与Reducer输出(Job输出)的Key/Value数据类型一致,分别为 Key:LongWritable value: Text。

注意:MapReduce Job 不一定要有reduce 但是必须要有map

3.Shuffle阶段

Mapper阶段Shuffle

3.0环形缓冲区

map()的输出结果是由collector处理的,每个Map任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。这个数据结构其实就是个字节数组,叫Kvbuffer,名如其义,但是这里面不光放置了数据,还放置了一些索引数据,给放置索引数据的区域起了一个Kvmeta的别名,在Kvbuffer的一块区域上穿了一个IntBuffer(字节序采用的是平台自身的字节序)的马甲。数据区域和索引数据区域在Kvbuffer中是相邻不重叠的两个区域,用一个分界点来划分两者,分界点不是亘古不变的,而是每次Spill之后都会更新一次。初始的分界点是0,数据的存储方向是向上增长,索引数据的存储方向是向下增长Kvbuffer的存放指针bufindex是一直闷着头地向上增长,比如bufindex初始值为0,一个Int型的key写完之后,bufindex增长为4,一个Int型的value写完之后,bufindex增长为8。

索引是对在kvbuffer中的键值对的索引,是个四元组,包括:value的起始位置、key的起始位置、partition值、value的长度,占用四个Int长度,Kvmeta的存放指针Kvindex每次都是向下跳四个“格子”,然后再向上一个格子一个格子地填充四元组的数据。

Mapper中的Kvbuffer的大小可以通过 mapreduce.task.io.sort.mb 来设置(默认是100 单位是m)具体设置多少根据服务器的性能来决定。当达到 mapreduce.map.sort.spill.percent 值(默认0.8 即80%)的大小时会执行spill溢写到磁盘中。

3.1 spill溢写

第一步:分区partitioner

对于map()方法的输出的Key/Value,系统会给定一个partition,默认是对Key的hashCode以reduceTask的数量进行取模,决定这对Key/Value交给哪一个reduce Task进行处理。主要特点:一是负载均衡,尽可能将工作均匀的分配给不同的Reduce Task进行处理;二是效率,分配速度快。总的来说,partitioner是对Map Task输出的结果做了一次排序,也是整个MapReduce Task的第一次排序

如果没有设置,默认取的是HashPartitioner.class看源码中的partitioner的方法

第二步:排序sort

默认的根据key进行排序,可进行自定义排序的顺序,默认要继承WritableComparator接口,一般会在自定义Key数据类型的时候写内部类来实现。

第三步:合并combiner

combine也被称之为Mapper阶段的reduce端,继承Reducer Class。对map()方法输出的Key/Value相同Key的数据进行一次合并,减少输出到reduce端的数量。保证Combiner多次执行不会对数据造成影响,reduce的结果一样。如果不一样就不需要设置combiner了。

第四步:写入磁盘

写入磁盘可以通过mapreduce.map.output.compress(默认 false) 参数开启压缩。开启文件压缩可以减少存储空间、减少数据传输的网络IO。通常整个作业的输出用的是Snappy算法,map的输出使用的是lzo算法。不同的压缩算法:压缩比例、速度和使用CPU/内存的情况(主要)不同。针对于HDFS来说,存储的文件是进行压缩后存储的。在MapReduce Shuffle过程中 map与reduce的中间传输数据时进行压缩后传输的。

Reducer阶段Shuffle

3.2 拷贝到copy

从map()输出的数据目录中拷贝自己需要的内容到环形缓冲区;当环形缓冲区达到一定大小时,写入本地磁盘。由于map()有多个,所有reduce会从多个map()所在的服务器去copy数据,这个过程是通过HTTP请求来进行数据的拉取,在map()输出数据之后会通知MRAppMaster,reduce会周期性的去询问MRAppMaster是否有需要拉取的数据,直到所有的map()执行完成。map()输出的数据在整个作业完成之后才会删除掉。

3.3 排序sort

同Mapper阶段的排序一样,默认根据Key进行排序

3.4 分组grouping****重要

该过程是将相同Key的Value进行分组,每一个Key的Value都放入到一个集合中,如: —>

4.Reducer阶段

用户自定义的处理逻辑,根据接收到的数据处理并输出,Reducer的输出就是整个MapReduce Task作业的输出。一般来说,Reducer的输入和Mapper的输出是相同的。Reducer Number数目是可以指定的,方法如下

1.使用job进行设置

job.NumReduceTasks(0);

2.configuration中

3.mapreduce的xml中设置

mapreduce.job.task0

4.命令行设置

优先级: 命令行 > configuration > job > xml

5.OutputFormat输出分片

把Reducer阶段的输出数据,根据设定的输出类型,输出到hdfs或DB中。OutputFormat抽象类的主要方法是 RecordWriter 写入数据。

shuffle 相关优化参数

mapreduce.map(reduce).log.levelmap或reduce日志的级别

mapreduce.map(reduce).cpu.vcores每个map或reduce的cpu数量

TopKey程序

需求是根据给定的文件,选出出现次数前十的单词。思路是统计好单词的次数,放到一个集合中,如果大于10个,那么删除掉最小的单词,留下的10个就是出现次数最多的10个单词了。下面我们开始开动写代码。

依据MapReduce编程模版,编写MapReduce程序。

1.copy模版,更改类名称(MapReduce名称、Map名称、Reducer名称)

2.依据业务划分Mapper和Reducer类中的业务实现,确定map()/reduce()输入输出的Key/Value类型。(map输入 Key/Value类型、map输出Key/Value类型、reduce输入 Key/Value类型、reduce输出 Key/Value类型)

a.Mapper类 b.Reducer类 c.Job设置

3.依据业务逻辑分别在map()和reduce()方法里面编写程序

4.总体检查配置,确定无误后,进行单元测试(小数据量的本地测试),放到集群环境上,运行在YARN上,进行业务测试。

注意事项:如果错误信息中报TopKeyMapper.,就是Mapper没有指定为static。Mapper和Reducer编写时需要指定为static静态类。

Mapper类:

Reducer类:

github地址:https://github.com/PeaceBao/Project01

启用日志聚合功能

为了方便查看日志信息,需要启动日志聚合功能。编辑yarn-site.xml填上是否启动和保存到时间,通常是7天

yarn.log-aggregation-enable

true

yarn.log-aggregation.retain-seconds

604800

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180726G0743B00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码关注腾讯云开发者

领取腾讯云代金券