大家好,我是不温卜火,昵称来源于成语—
不温不火
,本意是希望自己性情温和
。作为一名互联网行业的小白,博主写博客一方面是为了记录自己的学习过程,另一方面是总结自己所犯的错误希望能够帮助到很多和自己一样处于起步阶段的萌新。但由于水平有限,博客中难免会有一些错误出现,有纰漏之处恳请各位大佬不吝赐教!博客主页:https://buwenbuhuo.blog.csdn.net/
此系列主要为我的学弟学妹们所创作,在某些方面可能偏基础。如果读者感觉较为简单,还望见谅!如果文中出现错误,欢迎指正~
本文主要介绍了MapReduce编程模型详解及其编程实践,包括MapReduce入门与基础理论、MapReduce编程实践。
MapReduce思想在生活中处处可见,每个人或多或少都曾接触过这种思想。MapReduce的思想核心是“先分再合,分而治之”, 所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,把各部分的结果组成整个问题的结果。
这种思想来源于日常生活与工作时的经验,同样也完全适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。
Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“合”,即对map阶段的结果进行全局汇总。
这两个阶段合起来正是MapReduce思想的体现。
一个比较形象的语言解释MapReduce:
我们要数停车场中的所有的车数量。你数第一列,我数第二列。这就是“Map”。我们人越多,能够同时数车的人就越多,速度就越快。
数完之后,我们聚到一起,把所有人的统计数加在一起。这就是“Reduce”。
🔍1. 什么是分布式计算
分布式计算是一种计算方法,和集中式计算是相对的。
随着计算技术的发展,有些应用需要非常巨大的计算能力才能完成,如果采用集中式计算,需要耗费相当长的时间来完成。
分布式计算将该应用分解成许多小的部分,分配给多台计算机进行处理。这样可以节约整体计算时间,大大提高计算效率。
🔍2.大数据场景下模拟实现
并行计算、分布式计算都是属于高性能计算(HPC)的范畴,主要目的在于对大数据的分析与处理,但它们却存在很多差异。
🔍1.并行计算(Parallel Computing)
并行计算又称平行计算是指一种能够让多条指令同时进行的计算模式,可分为时间并行和空间并行。时间并行即利用多条流水线同时作业,空间并行是指使用多个处理器执行并发计算,以降低解决复杂问题所需要的时间。
并行计算主要目的在于:
并行计算能快速解决大型且复杂的计算问题。此外还能利用非本地资源,节约成本,同时克服单个计算机上存在的存储器限制。
🔍2.分布式计算
分布式计算是一个需要非常巨大的计算能力才能解决的问题分成许多小的部分,然后把这些部分分配给许多计算机进行处理,最后把这些计算结果综合起来得到最终的结果。分布式计算和集中式计算相对应的概念。
分布式计算是在两个或多个软件互相共享信息,这些软件既可以在同一台计算机上运行,也可以在通过网络连接起来的多台计算机上运行。
它的算法具有以下几个优点:
🔍3.从解决对象上看,两者都是大任务化为多个小任务
🔍4.二者的不同之处则在于:
摩尔定律是英特尔创始人之一戈登·摩尔的经验之谈,其核心内容为:集成电路上可以容纳的晶体管数目在大约每经过18个月便会增加一倍。换言之,处理器的性能每隔两年翻一倍。
但是从2005年开始摩尔定律逐渐失效 ,需要处理的数据量快速增加,人们开始借助于分布式并行编程来提高程序性能
分布式程序运行在大规模计算机集群上,可以并行执行大规模数据处理任务,从而获得海量的计算能力
谷歌公司最先提出了分布式并行编程模型MapReduce,Hadoop MapReduce是它的开源实现,后者比前者使用门槛低很多
但是其实早在MapReduce出现之前,已经有像MPI这样非常成熟的并行计算框架,但是其自身存在缺陷,所以Google才会推出MapReduce。下面我们可以看下MapReduce相较于传统的并行计算框架有哪些优势!如下表:
传统并行计算框架 | MapReduce | |
---|---|---|
集群架构/容错性 | 共享式,容错性差 | 非共享式,容错性好 |
硬件/价格/拓展性 | 刀片服务器、高速网、SAN,价格贵,拓展性差 | 普通PC机,便宜,拓展性好 |
编程/学习难度 | what-how,难 | what,简单 |
适用场景 | 实时、细粒度计算、计算密集型 | 批处理、非实时、数据密集型 |
MapReduce是Hadoop的一个模块,是一个分布式运算程序的编程框架。
对许多开发者来说,自己完完全全实现一个并行计算程序难度太大,而MapReduce就是一种简化并行计算的编程模型,降低了开发并行应用的入门门槛。
Hadoop MapReduce构思体现在如下的三个方面。
🔍1.如何对付大数据处理
对相互间不具有计算依赖关系的大数据计算任务,实现并行最自然的办法就是采取MapReduce分而治之的策略。
也就是Map阶段分的阶段,把大数据拆分成若干份小数据,多个程序同时并行计算产生中间结果;然后是Reduce聚合阶段,通过程序对并行的结果进行最终的汇总计算,得出最终的结果。
并行计算的第一个重要问题是如何划分计算任务或者计算数据以便对划分的子任务或数据块同时进行计算。不可分拆的计算任务或相互间有依赖关系的数据无法进行并行计算!
🔍2.构建抽象模型
MapReduce借鉴了函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型。
Map: 对一组数据元素进行某种重复式的处理;
Reduce: 对Map的中间结果进行某种进一步的结果整理。
MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
map: (k1; v1) → [(k2; v2)]
reduce: (k2; [v2]) → [(k3; v3)]
Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。通过以上两个编程接口,大家可以看出MapReduce处理的数据类型是<key,value>键值对。
关于MapReduce中的Map和Reduce函数如下表所示:
🔍3.统一架构、隐藏底层细节
如何提供统一的计算框架,如果没有统一封装底层细节,那么程序员则需要考虑诸如数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。
MapReduce最大的亮点在于通过抽象模型和计算框架把需要做什么(what need to do)与具体怎么做(how to do)分开了,为程序员提供一个抽象和高层的编程接口和框架。
程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码。如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千小到单个节点集群的自动调度使用。
MapReduce最早由Google于2004年在一篇名为《MapReduce:Simplified Data Processingon Large Clusters》的论文中提出,把分布式数据处理的过程拆分为Map和Reduce两个操作函数(受到Lisp以及其他函数式编程语言的启发),随后被Apache Hadoop参考并作为开源版本提供支持。它的出现解决了人们在最初面临海量数据束手无策的问题,同时,它还是易于使用和高度可扩展的,使得开发者无需关系分布式系统底层的复杂性即可很容易的编写分布式数据处理程序,并在成千上万台普通的商用服务器中运行。
Hadoop MapReduce是一个软件框架,用于轻松编写应用程序,这些应用程序以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集)。
MapReduce是一种面向海量数据处理的一种指导思想,也是一种用于对大规模数据进行分布式计算的编程模型。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
MapReduce采用“分而治之”策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片(split),这些分片可以被多个Map任务并行处理
MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,因为,移动数据需要大量的网络传输开销
MapReduce框架采用了Master/Slave架构,包括一个Master和若干个Slave。Master上运行JobTracker,Slave上运行TaskTracker
Hadoop框架是用Java实现的,但是,MapReduce应用程序则不一定要用Java来写
根据上述模型简介中,我们不难发现其中的优点与缺点,那么接下来我们分别对其进行探讨。
🔍1.优点
关于优点,我们在上文中,传统并行计算框架与MapReduce的计算中,已经了解过了,那么下面再次详细的介绍下:
🔍2.缺点
MapReduce虽然有很多的优势,但是也有它不擅长的。这里的“不擅长”,不代表不能做,而是在有些场景下实现的效果差,并不适合用MapReduce来处理,主要表现在以下结果方面:
🔍1. MapReduce框架体系
一个完整的mapreduce程序在分布式运行时有三类实例进程:
🔍2. MapReduce编程规范
MapReduce分布式的运算程序需要分成2个阶段,分别是Map阶段和Reduce阶段。Map阶段对应的是MapTask并发实例,完全并行运行。Reduce阶段对应的是ReduceTask并发实例,数据依赖于上一个阶段所有MapTask并发实例的数据输出结果。
MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
用户编写的程序分成三个部分:Mapper
,Reducer
,Driver
(提交运行mr程序的客户端驱动)。
用户自定义的Mapper和Reducer都要继承各自的父类。Mapper中的业务逻辑写在map()方法中,Reducer的业务逻辑写在reduce()方法中。整个程序需要一个Driver来进行提交,提交的是一个描述了各种必要信息的job对象。
最需要注意的是:整个MapReduce程序中,数据都是以kv键值对的形式流转的。因此在实际编程解决各种业务问题中,需要考虑每个阶段的输入输出kv分别是什么。并且在MapReduce中数据会因为某些默认的机制进行排序进行分组。所以说kv的类型数据确定及其重要。
🔍3. MapReduce工作流程
整个MapReduce工作流程可以分为3个阶段:map
、shuffle
、reduce
。
map阶段:
shuffle阶段:
reduce阶段:
我们通过反编译源码,发现WordCount案例有Map类、Reduce类和驱动类。且数据的类型是Hadoop自身封装的序列化类型。在此仅以部分源码为例:
在上图中, 我们看到所使用的Hadoop的数据类型有Text
,IntWritable
,它们其实相当于Java中的int
和String
类型。下面先来介绍下Hadoop中的数据类型。
Hadoop 数据类型 | Java数据类型 | 备注 |
---|---|---|
BooleanWritable | boolean | 标准布尔型数值 |
ByteWritable | byte | 单字节数值 |
IntWritable | int | 整型数 |
FloatWritable | float | 浮点数 |
LongWritable | long | 长整型数 |
DoubleWritable | double | 双字节数值 |
Text | String | 使用UTF8格式存储的文本 |
MapWritable | map | 映射 |
ArrayWritable | array | 数组 |
NullWritable | null | 当<key,value>中的key或value为空时使用 |
注意:如果需要将自定义的类放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。
🔍1. 什么是序列化
序列化 (Serialization)是将结构化对象转换成字节流以便于进行网络传输或写入持久存储的过程。
反序列化(Deserialization)是将字节流转换为一系列结构化对象的过程,重新创建该对象。
为了能够清晰的认知序列化与反序列化,下面对其分别举例:
(场景一)以上面提到的Person.java为例。这个VO类中的两个字段name和age在程序运行后都在堆内存中,程序执行完毕后内存得到释放,name和age的值也不复存在。如果现在计算机要把这个类的实例发送到另一台机器、或是想保存这个VO类的实例到数据库(持久化对象),以便以后再取出来用。这时就需要对这个类进行序列化,便于传送或保存。用的时候再反序列化重新生成这个对象的实例。
(场景二)以搬桌子为例,桌子太大了不能通过比较小的门,我们要把它拆了再运进去,这个拆桌子的过程就是序列化。同理,反序列化就是等我们需要用桌子的时候再把它组合起来,这个过程就是反序列化。
简单概况: 把对象转换为字节序列的过程称为对象的序列化。 把字节序列恢复为对象的过程称为对象的反序列化。
🔍2. 为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
🔍3. 序列化的用途:
🔍4. Java的序列化机制
Java中,一切都是对象,在分布式环境中经常需要将Object从这一端网络或设备传递到另一端。这就需要有一种可以在两端传输数据的协议。Java序列化机制就是为了解决这个问题而产生。
Java对象序列化的机制,把对象表示成一个二进制的字节数组,里面包含了对象的数据,对象的类型信息,对象内部的数据的类型信息等等。通过保存或则转移这些二进制数组达到持久化、传递的目的。
要实现序列化,需要实现java.io.Serializable接口。反序列化是和序列化相反的过程,就是把二进制数组转化为对象的过程。
🔍5. 为什么不用Java的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
🔍6. Hadoop的序列化机制
Hadoop的序列化没有采用java的序列化机制,而是实现了自己的序列化机制。
原因在于java的序列化机制比较臃肿,重量级,是不断的创建对象的机制,并且会额外附带很多信息(校验、继承关系系统等)。但在Hadoop的序列化机制中,用户可以复用对象,这样就减少了java对象的分配和回收,提高了应用效率。
Hadoop通过Writable接口实现的序列化机制,不过没有提供比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。(自定义比较)。
Writable接口提供两个方法(write和readFields)。
Hadoop序列化特点:
🔍7. 自定义bean对象实现序列化接口(Writable)
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
具体实现bean对象序列化步骤如下7步。 (1)必须实现Writable接口 (2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public FlowBean() {
super();
}
(3)重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
(4)重写反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
(5)注意反序列化的顺序和序列化的顺序完全一致 (6)要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。 (7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}
🔍1. 需求:统计每一个手机号耗费的总上行流量、总下行流量、总流量
🔆输入数据(数据格式如下)
id 手机号码 网络ip 上行流量 下行流量 网络状态码
1 13736230513 192.196.100.1 www.Jayce.com 2481 24681 200
8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
12 15959002129 192.168.100.9 www.Jayce.com 1938 180 500
....
🔆期望输出数据格式
手机号码 上行流量 下行流量 总流量
13590439668 1116 954 2070
🔍2. 需求分析
🔍3. 源码实现(在此仅给出部分源码)
🔍4. 查看结果
通过对比结果,我们发现已经完成了目标。
WordCount中文叫做单词统计、词频统计,指的是使用程序统计某文本文件中,每个单词出现的总次数。这个是大数据计算领域经典的入门案例,虽然业务及其简单,但是希望能够通过案例感受背后MapReduce的执行流程和默认的行为机制,这才是关键。
🔍1.WordCount程序任务
程序 | WordCount |
---|---|
输入 | 一个包含大量单词的文本文件 |
输出 | 文件中每个单词及其出现次数(频数),并按照单词字母顺序排序,每个字母和其频数占一行,单词和频数之间有间隔 |
🔍2.WordCount的输入与输出实例
🔍1.针对于WordCount的MapReduce编程思路我们要以下面的角度作为分析方式:
其具体过程如下图所示:
🔍2.map阶段的核心:把输入的数据经过切割,全部标记1。因此输出就是<单词,1>。
🔍3.shuffle阶段核心:经过默认的排序分区分组,key相同的单词会作为一组数据构成新的kv对。
🔍4.reduce阶段核心:处理shuffle完的一组数据,该组数据就是该单词所有的键值对。对所有的1进行累加求和,就是该单词的总次数。最终输出<单词,总次数>。
🔍1.Map过程示意图
🔍2.用户没有定义Combiner时的Reduce过程示意图
🔍3.用户有定义Combiner时的Reduce过程示意图
🔍4.源码实现(在此仅给出部分源码)
🔍5. 查看结果
所谓的运行模式讲的是:mr程序是单机运行还是分布式运行?mr程序需要的运算资源是yarn分配还是单机系统分配?
运行在何种模式 取决于下述这个参数:
mapreduce.framework.name=yarn
集群模式
mapreduce.framework.name=local
本地模式
如果不指定 默认是local模式,在mapred-default.xml
中有定义。如果代码中、运行的环境中有配置,会默认覆盖default配置。
🔍1.本地模式运行
mapreduce程序是被提交给LocalJobRunner
在本地以单进程的形式运行。而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上
本质是程序的conf中是否有mapreduce.framework.name=local
本地模式非常便于进行业务逻辑的debug。
右键直接运行main方法所在的主类即可。
🔍2.集群模式运行
将mapreduce程序提交给yarn集群,分发到很多的节点上并发执行。处理的数据和输出结果应该位于hdfs文件系统 提交集群的实现步骤:
将程序打成jar包,然后在集群的任意一个节点上用命令启动
hadoop@Master:/opt/moudle/hadoop/myapp$ hadoop jar MapReduceDemo.jar WordCountDriver input output
hadoop@Master:/opt/moudle/hadoop/myapp$ yarn jar MapReduceDemo.jar com.buwenbuhuo.mapreduce.WordCount.WordCountDriver input output
MapReduce框架运转在<key,value>键值对
上,也就是说,框架把作业的输入看成是一组<key,value>键值对,同样也产生一组<key,value>键值对作为作业的输出,这两组键值对可能是不同的。
🔍1.输入特点
默认读取数据的组件叫做TextInputFormat
。
关于输入路径:
🔍2.输出特点
默认输出数据的组件叫做TextOutputFormat
。
输出路径不能提前存在 否则执行报错 对输出路径进行检测判断
MapReduce可以很好地应用于各种计算问题,如下:
🔍1.用MapReduce实现关系的自然连接
🔍2.用MapReduce实现关系的自然连接
首先我们需要知道MapReduce的运行流程,其执行流程图如下:
🔍1.执行流程图
🔍2.Map阶段执行过程
🔍3.Redue阶段执行过程
在整个MapReduce程序的开发过程中,我们最大的工作量是覆盖map函数和覆盖reduce函数。
🔍1.默认情况下MR输出文件个数
在默认情况下,不管map阶段有多少个并发执行task,到reduce阶段,所有的结果都将有一个reduce来处理,并且最终结果输出到一个文件中。
此时,MapReduce的执行流程如下所示:
🔍2.修改reducetask个数
在MapReduce程序的驱动类中,通过job提供的setNumReduceTasks
方法,可以修改reducetask的个数。
默认情况下不设置,reducetask个数为1,结果输出到一个文件中。
使用api修改reducetask个数之后,输出结果文件的个数和reducetask个数对应。比如设置为5个,
// 设置自定义分区器
job.setPartitionerClass(ProvincePartitioner1.class);
// 设置对应的ReduceTask的个数
job.setNumReduceTasks(5);
此时的输出结果如下所示:
此时,MapReduce的执行流程如下所示:
🔍3.数据分区概念
当MapReduce中有多个reducetask执行的时候,此时maptask的输出就会面临一个问题:究竟将自己的输出数据交给哪一个reducetask来处理,这就是所谓的数据分区(partition)问题。
🔍4.默认分区规则
MapReduce默认分区规则是HashPartitioner
。跟map输出的数据key有关。
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
当然用户也可以自己自定义分区规则。
在MapReduce编程中,核心是牢牢把握住每个阶段的输入输出key是什么
。
因为mr中很多默认行为都跟key相关。
排序
:key的字典序a-z 正序分区
:key.hashcode % reducetask 个数分组
:key相同的分为一组最重要的是,如果觉得默认的行为不满足业务需求,MapReduce还支持自定义排序、分区、分组的规则,这将使得编程更加灵活和方便。
MapReduce体系结构主要由四个部分组成,分别是:Client、JobTracker、TaskTracker以及Task
MapReduce主要有以下4个部分组成:
🔆1.Client
🔆2.JobTracker
🔆3.TaskTracker
🔆4.Task
🔍1.概念
MapTask的并行度指的是map阶段有多少个并行的task共同处理任务。map阶段的任务处理并行度,势必影响到整个job的处理速度。那么,MapTask并行实例是否越多越好呢?其并行度又是如何决定呢?
🔍2.原理机制
一个MapReducejob
的map
阶段并行度由客户端在提交job
时决定,即客户端提交job之前会对待处理数据进行逻辑切片。切片完成会形成切片规划文件(job.split
),每个逻辑切片最终对应启动一个maptask
。
逻辑切片机制由FileInputFormat
实现类的getSplits()
方法完成。
FileInputFormat
中默认的切片机制:
如下图所示:
🔍3.相关参数、优化
在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize))
;
切片主要由这几个值来运算决定:
minsize:默认值:1
配置参数: mapreduce.input.fileinputformat.split.minsize
maxsize:默认值:Long.MAXValue
配置参数:mapreduce.input.fileinputformat.split.maxsize
因此,默认情况下,split size=block size,在hadoop 2.x中为128M。
但是,不论怎么调参数,都不能让多个小文件“划入”一个split。
还有个细节就是:当bytesRemaining/splitSize > 1.1不满足的话,那么最后所有剩余的会作为一个切片
。从而不会形成例如129M文件规划成两个切片的局面。
reducetask并行度
同样影响整个job的执行并发度和执行效率,与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置
:
job.setNumReduceTasks
(4);
如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜。
注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask
。
🔍1.MapReduce执行流程图
🔍2.关于Split(分片)
HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。
🔍3.Map任务的数量
Hadoop为每个split创建一个Map任务,split 的多少决定了Map任务的数目。大多数情况下,理想的分片大小是一个HDFS块
🔍4.Reduce任务的数量
最优的Reduce任务个数取决于集群中可用的reduce任务槽(slot)的数目
通常设置比reduce任务槽数目稍微小一些的Reduce任务个数(这样可以预留一些系统资源处理可能发生的错误)
🔍1. Map端的Shuffle过程
🔍2. 执行步骤
整个Map阶段流程大体如上图所示。简单概述:input File通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
合并(Combine)和归并(Merge)的区别: 两个键值对<“a”,1>和<“a”,1>,如果合并,会得到<“a”,2>,如果归并,会得到<“a”,<1,1>>
🔍3. 详细步骤
逻辑切片规划
得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。RecordReader
对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>
。Key表示每行首字符偏移值,value表示这一行文本内容。执行用户重写的map函数
。RecordReader读取一行这里调用一次。collect
中,会先对其进行分区
处理,默认使用HashPartitioner
。MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reducetask处理。
默认对key hash后再以reducetask数量取模
。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
环形缓冲区
,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。 缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从
内存往磁盘写数据的过程被称为Spill,中文可译为溢写
。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
排序(Sort)
。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。如果job设置过Combiner
,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。
那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
临时文件
(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并
,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件
,以记录每个reduce对应数据的偏移量。至此map整个阶段结束。
🔍1. Reduce端的Shuffle过程
🔍2. 执行步骤
Reduce大致分为copy、sort、reduce
三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。
🔍3. 详细步骤
Copy阶段
,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。Merge阶段
。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map
端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。数据排序
。调用reduce方法
,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。Shuffle的本意是洗牌、混洗的意思,把一组有规则的数据尽量打乱成无规则的数据。
而在MapReduce中,Shuffle更像是洗牌的逆过程,指的是将map端的无规则输出按指定的规则“打乱”成具有一定规则的数据,以便reduce端接收处理。
shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程
称作shuffle。
🔍1. Shuffle过程流程图
🔍2. 具体过程
🔍1. 概括
可以对map的输出进行压缩(map输出到reduce输入的过程,可以shuffle过程中网络传输的数据量)
可以对reduce的输出结果进行压缩(最终保存到hdfs上的数据,主要是减少占用HDFS存储)
🔍2. 压缩算法
使用hadoop checknative来查看hadoop支持的各种压缩算法,如果出现openssl为false,那么就在线安装一下依赖包。
hadoop@Master:~$ sudo apt-get install openssl
hadoop@Master:~$ sudo apt-get install libssl-dev
hadoop支持的压缩算法
压缩格式 | 工具 | 算法 | 文件扩展名 | 是否可切分 |
---|---|---|---|---|
DEFLATE | 无 | DEFLATE | .deflate | 否 |
Gzip | gzip | DEFLATE | .gz | 否 |
bzip2 | bzip2 | bzip2 | bz2 | 是 |
LZO | lzop | LZO | .lzo | 否 |
LZ4 | 无 | LZ4 | .lz4 | 否 |
Snappy | 无 | Snappy | .snappy | 否 |
各种压缩算法对应使用的java类
压缩格式 | 对应使用的java类 |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DeFaultCodec |
gzip | org.apache.hadoop.io.compress.GZipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
LZ4 | org.apache.hadoop.io.compress.Lz4Codec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
🔍3. 压缩的设置方式
🔆方式一:代码中设置
设置map阶段的压缩
设置reduce阶段的压缩
🔆方式二:配置文件全局设置
我们可以修改mapred-site.xml配置文件,然后重启集群,以便对所有的mapreduce任务进行压缩。
map输出数据进行压缩
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
reduce输出数据进行压缩
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<property> <name>mapreduce.output.fileoutputformat.compress.type</name>
<value>RECORD</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>
所有节点都要修改mapred-site.xml,修改完成之后记得重启集群
🔍1.任务要求
根据上图要求,我们需要先进行以下操作
首先,在Linux系统本地创建两个文件,即文件wordfile1.txt和wordfile2.txt。在实际应用中,这两个文件可能会非常大,会被分布存储到多个节点上。但是,为了简化任务,这里的两个文件只包含几行简单的内容。需要说明的是,针对这两个小数据集样本编写的MapReduce词频统计程序,不作任何修改,就可以用来处理大规模数据集的词频统计。
🔆文件wordfile1.txt的内容如下:
My name is len
I love China
🔆文件wordfile2.txt的内容如下:
I am from China
假设HDFS中有一个/user/hadoop/input文件夹,并且文件夹为空,请把文件wordfile1.txt和wordfile2.txt上传到HDFS中的input文件夹下。现在需要设计一个词频统计程序,统计input文件夹下所有文件中每个单词的出现次数,也就是说,程序应该输出如下形式的结果:
My 1
name 1
is 1
len 1
I 2
love 1
China 2
am 1
from 1
🔍2.编写Map处理逻辑
🔆源码实现:
🔍3.编写Reduce处理逻辑
🔆源码实现:
🔍4.编写main方法
🔍5.完整源码
public class WordCount {
public WordCount() {
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true)?0:1);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public IntSumReducer() {
}
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
}
🔍1.在Eclipse中创建项目
首先,启动Eclipse,启动以后会弹出如下图所示界面,提示设置工作空间(workspace)。
可以直接采用默认的设置“/home/hadoop/workspace”,点击“OK”按钮。可以看出,由于当前是采用hadoop用户登录了Linux系统,因此,默认的工作空间目录位于hadoop用户目录“/home/hadoop”下。
Eclipse启动以后,呈现的界面如下图所示。
选择“File–>New–>Java Project”菜单,开始创建一个Java工程,弹出如下图所示界面。
在“Project name”后面输入工程名称“WordCount”,选中“Use default location”,让这个Java工程的所有文件都保存到“/home/hadoop/workspace/WordCount”目录下。在“JRE”这个选项卡中,可以选择当前的Linux系统中已经安装好的JDK,比如jdk1.8。然后,点击界面底部的“Next>”按钮,进入下一步的设置。
🔍2.为项目添加需要用到的JAR包
进入下一步的设置以后,会弹出如下图所示界面。
需要在这个界面中加载该Java工程所需要用到的JAR包,这些JAR包中包含了与Hadoop相关的Java API。这些JAR包都位于Linux系统的Hadoop安装目录下,对于本文章而言,就是在“/opt/moudle/hadoop/share/hadoop”目录下。点击界面中的“Libraries”选项卡,然后,点击界面右侧的“Add External JARs…”按钮,弹出如下图所示界面。
在该界面中,上面有一排目录按钮(即“opt”、“moudle”、“hadoop”、“share”、“hadoop”、“mapreduce”和“lib”),当点击某个目录按钮时,就会在下面列出该目录的内容。 为了编写一个MapReduce程序,一般需要向Java工程中添加以下JAR包: (1)“/opt/moudle/hadoop/share/hadoop/common”目录下的hadoop-common-3.1.3.jar和haoop-nfs-3.1.3.jar; (2)“/opt/moudle/hadoop/share/hadoop/common/lib”目录下的所有JAR包; (3)“/opt/moudle/hadoop/share/hadoop/mapreduce”目录下的所有JAR包,但是,不包括jdiff、lib、lib-examples和sources目录,具体如下图所示。
(4)“/opt/moudle/hadoop/share/hadoop/mapreduce/lib”目录下的所有JAR包。 比如,如果要把“/opt/moudle/hadoop/share/hadoop/common”目录下的hadoop-common-3.1.3.jar和haoop-kms-3.1.3.jar添加到当前的Java工程中,可以在界面中点击相应的目录按钮,进入到common目录,然后,界面会显示出common目录下的所有内容(如下图所示)。
请在界面中用鼠标点击选中hadoop-common-3.1.3.jar和haoop-kms-3.1.3.jar,然后点击界面右下角的“确定”按钮,就可以把这两个JAR包增加到当前Java工程中,出现的界面如下图所示。
从这个界面中可以看出,hadoop-common-3.1.3.jar和haoop-kms-3.1.3.jar已经被添加到当前Java工程中。然后,按照类似的操作方法,可以再次点击“Add External JARs…”按钮,把剩余的其他JAR包都添加进来。需要注意的是,当需要选中某个目录下的所有JAR包时,可以使用“Ctrl+A”组合键进行全选操作。全部添加完毕以后,就可以点击界面右下角的“Finish”按钮,完成Java工程WordCount的创建。
🔍3. 编写java程序
由于程序源码已经在分析时实现,所以接下来直接进行操作。
选择“New–>Class”菜单以后会出现如下图所示界面。
在该界面中,只需要在“Name”后面输入新建的Java类文件的名称,这里采用名称“WordCount”,其他都可以采用默认设置,然后,点击界面右下角“Finish”按钮,出现如下图所示界面。
可以看出,Eclipse自动创建了一个名为“WordCount.java”的源代码文件,并且包含了代码“public class WordCount{}”,请清空该文件里面的代码,然后在该文件中输入完整的词频统计程序代码,具体如下(也可在源码分析部分直接复制):
🔍4. 编译打包程序
现在就可以编译上面编写的代码。可以直接点击Eclipse工作界面上部的运行程序的快捷按钮,当把鼠标移动到该按钮上时,在弹出的菜单中选择“Run as”,继续在弹出来的菜单中选择“Java Application”,运行之后会如下图所示界面。
下面就可以把Java应用程序打包生成JAR包,部署到Hadoop平台上运行。现在可以把词频统计程序放在“/opt/moudle/hadoop/myapp”目录下。如果该目录不存在,可以使用如下命令创建:
hadoop@Master:/opt/moudle/hadoop$ cd /usr/local/hadoop
hadoop@Master:/opt/moudle/hadoop$ mkdir myapp
首先,请在Eclipse工作界面左侧的“Package Explorer”面板中,在工程名称“WordCount”上点击鼠标右键,在弹出的菜单中选择“Export”,如下图所示。
然后,会弹出如下图所示界面。
在该界面中,选择“Runnable JAR file”,然后,点击“Next>”按钮,弹出如下图所示界面。
在该界面中,“Launch configuration”用于设置生成的JAR包被部署启动时运行的主类,需要在下拉列表中选择刚才配置的类“WordCount-WordCount”。在“Export destination”中需要设置JAR包要输出保存到哪个目录,比如,这里设置为“/opt/moudle/hadoop/myapp/WordCount.jar”。在“Library handling”下面选择“Extract required libraries into generated JAR”。然后,点击“Finish”按钮,会出现如下图所示界面。
可以忽略该界面的信息,直接点击界面右下角的“OK”按钮,启动打包过程。打包过程结束后,会出现一个警告信息界面,如下图所示。
可以忽略该界面的信息,直接点击界面右下角的“OK”按钮。至此,已经顺利把WordCount工程打包生成了WordCount.jar。可以到Linux系统中查看一下生成的WordCount.jar文件,可以在Linux的终端中执行如下命令:
hadoop@Master:~$ cd /opt/moudle/hadoop/myapp/
hadoop@Master:/opt/moudle/hadoop/myapp$ ll
总用量 93976
drwxrwxr-x 2 hadoop hadoop 4096 9月 27 15:16 ./
drwxr-xr-x 14 hadoop hadoop 4096 9月 28 20:11 ../
-rw-rw-r-- 1 hadoop hadoop 57722832 9月 27 15:16 HDFSExample.jar
-rw-rw-r-- 1 hadoop hadoop 5889 9月 20 15:06 MapReduceDemo.jar
-rw-rw-r-- 1 hadoop hadoop 38487431 9月 28 20:18 WordCount.jar
可以看到,“/opt/moudle/hadoop/myapp”目录下已经存在一个WordCount.jar文件。
在运行程序之前,需要启动Hadoop,命令如下:
hadoop@Master:/opt/moudle/hadoop$ start-all.sh
在启动Hadoop之后,需要首先删除HDFS中与当前Linux用户hadoop对应的output目录(即HDFS中的“/user/hadoop/output”目录),这样确保后面程序运行不会出现问题,具体命令如下:
hadoop@Master:/opt/moudle/hadoop$ hdfs dfs -rm -r /user/hadoop/output
然后,把之前在在Linux本地文件系统中新建的两个文件wordfile1.txt和wordfile2.txt(假设这两个文件位于“/opt/moudle/hadoop”目录下,并且里面包含了一些英文语句),上传到HDFS中的“/user/hadoop/input”目录下,命令如下:
hadoop@Master:/opt/moudle/hadoop$ hdfs dfs -put wordfile1.txt /user/hadoop/input
2021-09-28 20:11:27,930 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
hadoop@Master:/opt/moudle/hadoop$ hdfs dfs -put wordfile2.txt /user/hadoop/input
2021-09-28 20:11:34,434 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
hadoop@Master:/opt/moudle/hadoop$ hdfs dfs -ls /user/hadoop/input
Found 2 items
-rw-r--r-- 3 hadoop supergroup 28 2021-09-28 20:11 /user/hadoop/input/wordfile1.txt
-rw-r--r-- 3 hadoop supergroup 16 2021-09-28 20:11 /user/hadoop/input/wordfile2.txt
现在,就可以在Linux系统中,使用hadoop jar命令运行程序,命令如下:
hadoop@Master:~$ cd /opt/moudle/hadoop/myapp/
hadoop@Master:/opt/moudle/hadoop/myapp$ hadoop jar WordCount.jar input output
上面命令执行以后,当运行顺利结束时,屏幕上会显示类似如下的信息:
……//这里省略若干屏幕信息
2021-09-28 20:26:55,683 INFO mapreduce.Job: Running job: job_1632830686414_0001
2021-09-28 20:27:03,779 INFO mapreduce.Job: Job job_1632830686414_0001 running in uber mode : false
2021-09-28 20:27:03,782 INFO mapreduce.Job: map 0% reduce 0%
2021-09-28 20:27:08,847 INFO mapreduce.Job: map 100% reduce 0%
2021-09-28 20:27:16,889 INFO mapreduce.Job: map 100% reduce 100%
2021-09-28 20:27:16,897 INFO mapreduce.Job: Job job_1632830686414_0001 completed successfully
2021-09-28 20:27:16,983 INFO mapreduce.Job: Counters: 53
File System Counters
FILE: Number of bytes read=116
FILE: Number of bytes written=653040
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=274
HDFS: Number of bytes written=54
HDFS: Number of read operations=11
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=2
Launched reduce tasks=1
Data-local map tasks=2
Total time spent by all maps in occupied slots (ms)=5887
Total time spent by all reduces in occupied slots (ms)=4688
Total time spent by all map tasks (ms)=5887
Total time spent by all reduce tasks (ms)=4688
Total vcore-milliseconds taken by all map tasks=5887
Total vcore-milliseconds taken by all reduce tasks=4688
Total megabyte-milliseconds taken by all map tasks=6028288
Total megabyte-milliseconds taken by all reduce tasks=4800512
Map-Reduce Framework
Map input records=3
Map output records=11
Map output bytes=88
Map output materialized bytes=122
Input split bytes=230
Combine input records=11
Combine output records=11
Reduce input groups=9
Reduce shuffle bytes=122
Reduce input records=11
Reduce output records=9
Spilled Records=22
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=321
CPU time spent (ms)=1510
Physical memory (bytes) snapshot=770977792
Virtual memory (bytes) snapshot=7914119168
Total committed heap usage (bytes)=689963008
Peak Map Physical memory (bytes)=291536896
Peak Map Virtual memory (bytes)=2637176832
Peak Reduce Physical memory (bytes)=190496768
Peak Reduce Virtual memory (bytes)=2641076224
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=44
File Output Format Counters
Bytes Written=54
词频统计结果已经被写入了HDFS的“/user/hadoop/output”目录中,可以执行如下命令查看词频统计结果:
hadoop@Master:/opt/moudle/hadoop/myapp$ hdfs dfs -cat output/*
下面我们来进行对比以下,看看其结果是否和我们预期的一样
通过对比,我们发现和预期结果一样,那么说明我们的操作是没有任何问题的。
至此,词频统计程序顺利运行结束。需要注意的是,如果要再次运行WordCount.jar,需要首先删除HDFS中的output目录,否则会报错。
🔍 数据简介
现有美国2021-1-27号,各个县county的新冠疫情累计案例信息,包括确诊病例和死亡病例,数据格式如下所示:
2020-01-27,Maricopa,Arizona,04013,1,0
2020-01-27,Los Angeles,California,06037,1,0
2020-01-27,Orange,California,06059,1,0
2020-01-27,Cook,Illinois,17031,1,0
2020-01-27,Snohomish,Washington,53061,1,0
字段含义如下:date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)。
希望能够通过此案例学会自定义MapReduce各个组件。包括自定义对象、序列化、排序、分区、分组。
需要数据及源码可自行下载:链接:https://pan.baidu.com/s/1IZKRtXvbNQpnimBizR36Pw 提取码:4x1d
🔍1.依赖 下面为所用到的依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
🔍2.新建log4j.properties
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
🔍1.需求:统计美国,每个州state累计确诊案例数、累计死亡案例数。
🔍2.需求分析
自定义一个对象CovidCountBean,用于封装每个县的确诊病例数和死亡病例数。注意需要实现hadoop的序列化机制。
以州state作为map阶段输出的key,以CovidCountBean作为value,这样经过MapReduce的默认排序分组规则,属于同一个州的数据就会变成一组进行reduce处理,进行累加即可得出每个州累计确诊病例。
🔍3.代码实现
🔆1.自定义Bean
/**
* @author 不温卜火
* @create 2021-09-30 12:57
* @MyBlog https://buwenbuhuo.blog.csdn.net
*/
public class CovidCountBean implements Writable {
private long cases;//确诊病例数
private long deaths;//死亡病例数
public CovidCountBean() {
}
public CovidCountBean(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
public void set(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
public long getCases() {
return cases;
}
public void setCases(long cases) {
this.cases = cases;
}
public long getDeaths() {
return deaths;
}
public void setDeaths(long deaths) {
this.deaths = deaths;
}
/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(cases);
out.writeLong(deaths);
}
/**
* 反序列化方法 注意顺序
*/
@Override
public void readFields(DataInput in) throws IOException {
this.cases = in.readLong();
this.deaths =in.readLong();
}
@Override
public String toString() {
return cases +"\t"+ deaths;
}
}
🔆2.Mapper类
public class CovidSumMapper extends Mapper<LongWritable, Text, Text, CovidCountBean> {
Text outKey = new Text();
CovidCountBean outValue = new CovidCountBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
//州
outKey.set(fields[2]);
//Covid数据 确诊病例 死亡病例
outValue.set(Long.parseLong(fields[fields.length-2]),Long.parseLong(fields[fields.length-1]));
//map输出结果
context.write(outKey,outValue);
}
}
🔆3.Reducer类
public class CovidSumReducer extends Reducer<Text, CovidCountBean,Text,CovidCountBean> {
CovidCountBean outValue = new CovidCountBean();
@Override
protected void reduce(Text key, Iterable<CovidCountBean> values, Context context) throws IOException, InterruptedException {
long totalCases = 0;
long totalDeaths =0;
//累加统计
for (CovidCountBean value : values) {
totalCases += value.getCases();
totalDeaths +=value.getDeaths();
}
outValue.set(totalCases,totalDeaths);
context.write(key,outValue);
}
}
🔆4.Driver类
// Mapreduce 自定义对象
public class CovidSumDriver {
public static void main(String[] args) throws Exception{
//配置文件对象
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf, CovidSumDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidSumDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(CovidSumMapper.class);
job.setReducerClass(CovidSumReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CovidCountBean.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CovidCountBean.class);
// 配置作业的输入数据路径
FileInputFormat.setInputPaths(job, new Path("E:\\inputCOVID"));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path("E:\\outputCOVID"));
//判断输出路径是否存在 如果存在删除
Path outputPath = new Path("E:\\outputCOVID");
outputPath.getFileSystem(conf).delete(outputPath,true);
// 提交作业并等待执行完成
boolean resultFlag = job.waitForCompletion(true);
//程序退出
System.exit(resultFlag ? 0 :1);
}
}
🔍4.代码执行结果
🔍1.需求:将美国,每个州state的确诊案例数进行倒序排序。
🔍2.需求分析
如果你的需求中需要根据某个属性进行排序 ,不妨把这个属性作为key。因为MapReduce中key有默认排序行为的。但是需要进行如下考虑:
🔍3.代码实现
🔆1.自定义Bean
public class CovidCountBean implements WritableComparable<CovidCountBean> {
private long cases;//确诊病例数
private long deaths;//死亡病例数
public CovidCountBean() {
}
public CovidCountBean(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
public void set(long cases, long deaths) {
this.cases = cases;
this.deaths = deaths;
}
public long getCases() {
return cases;
}
public void setCases(long cases) {
this.cases = cases;
}
public long getDeaths() {
return deaths;
}
public void setDeaths(long deaths) {
this.deaths = deaths;
}
/**
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(cases);
out.writeLong(deaths);
}
/**
* 反序列化方法 注意顺序
*/
@Override
public void readFields(DataInput in) throws IOException {
this.cases = in.readLong();
this.deaths =in.readLong();
}
@Override
public String toString() {
return cases +"\t"+ deaths;
}
/**
* 排序比较器 本业务中根据确诊案例数倒序排序
*/
@Override
public int compareTo(CovidCountBean o) {
return this.cases - o.getCases()> 0 ? -1:(this.cases - o.getCases() < 0 ? 1 : 0);
}
}
🔆2.Mapper类
public class CovidSortSumMapper extends Mapper<LongWritable, Text, CovidCountBean,Text> {
CovidCountBean outKey = new CovidCountBean();
Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
outKey.set(Long.parseLong(fields[1]),Long.parseLong(fields[2]));
outValue.set(fields[0]);
context.write(outKey,outValue);
}
}
🔆3.Reducer类
public class CovidSortSumReducer extends Reducer<CovidCountBean, Text,Text,CovidCountBean> {
@Override
protected void reduce(CovidCountBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text outKey = values.iterator().next();
context.write(outKey,key);
}
}
🔆4.Driver类
public class CovidSortSumDriver {
public static void main(String[] args) throws Exception{
//配置文件对象
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf, CovidSortSumDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidSortSumDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(CovidSortSumMapper.class);
job.setReducerClass(CovidSortSumReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(CovidCountBean.class);
job.setMapOutputValueClass(Text.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CovidCountBean.class);
// 配置作业的输入数据路径
FileInputFormat.setInputPaths(job, new Path("E:\\outputCOVID"));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path("E:\\outputCOVID1"));
//判断输出路径是否存在 如果存在删除
Path outputPath = new Path("E:\\outputCOVID1");
outputPath.getFileSystem(conf).delete(outputPath,true);
// 提交作业并等待执行完成
boolean resultFlag = job.waitForCompletion(true);
//程序退出
System.exit(resultFlag ? 0 :1);
}
}
🔍4.代码执行结果
🔍1.需求:将美国每个州的疫情数据输出到各自不同的文件中,即一个州的数据在一个结果文件中。
🔍2.需求分析
输出到不同文件中–>reducetask有多个(>2)–>默认只有1个,如何有多个?—>可以设置,job.setNumReduceTasks(N)—>当有多个reducetask 意味着数据分区---->默认分区规则是什么? hashPartitioner—>默认分区规则符合你的业务需求么?---->符合,直接使用—>不符合,自定义分区。
🔍3.代码实现
🔆1.自定义分区器
public class StatePartitioner extends Partitioner<Text, Text> {
//模拟美国各州数据字典 实际中可以从redis中快速查询 如果数据不大也可以使用数据集合保存
public static HashMap<String, Integer> stateMap = new HashMap<String, Integer>();
static{
stateMap.put("Alabama", 0);
stateMap.put("Arkansas", 1);
stateMap.put("California", 2);
stateMap.put("Florida", 3);
stateMap.put("Indiana", 4);
}
@Override
public int getPartition(Text key, Text value, int numPartitions) {
Integer code = stateMap.get(key.toString());
if (code != null) {
return code;
}
return 5;
}
}
🔆2.Mapper类
public class CovidPartitionMapper extends Mapper<LongWritable, Text,Text, Text> {
Text outKey = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split(",");
//以州作为输出的key
outKey.set(splits[2]);
context.write(outKey,value);
}
}
🔆3.Reducer类
public class CovidPartitionReducer extends Reducer<Text,Text,Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value,NullWritable.get());
}
}
}
🔆4.Driver类
public class CovidPartitionDriver {
public static void main(String[] args) throws Exception{
//配置文件对象
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf, CovidPartitionDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidPartitionDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(CovidPartitionMapper.class);
job.setReducerClass(CovidPartitionReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//todo 设置reducetask个数 和自定义分区器
job.setNumReduceTasks(8);
job.setPartitionerClass(StatePartitioner.class);
// 配置作业的输入数据路径
FileInputFormat.setInputPaths(job, new Path("E:\\inputCOVID"));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path("E:\\outputCOVID2"));
//判断输出路径是否存在 如果存在删除
Path outputPath = new Path("E:\\outputCOVID2");
outputPath.getFileSystem(conf).delete(outputPath,true);
// 提交作业并等待执行完成
boolean resultFlag = job.waitForCompletion(true);
//程序退出
System.exit(resultFlag ? 0 :1);
}
}
🔍4.代码执行结果
分区个数和reducetask个数关系 正常情况下: 分区的个数 = reducetask个数。
每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一。
具体实现步骤:
1、 自定义一个combiner继承Reducer,重写reduce方法
2、 在job中设置: job.setCombinerClass(CustomCombiner.class)
combiner能够应用的前提是不能影响最终的业务逻辑,而且,combiner的输出kv应该跟reducer的输入kv类型要对应起来。下述场景禁止使用,不仅优化了数据量,还改变了最终的结果
🔍1.需求:统计过程中对每一个MapTask的输出进行局部汇总,以减小网络传输量即采用Combiner功能。
🔍2.需求分析
🔍3.代码实现
🔆1.自定义一个combiner继承Reducer,重写reduce方法
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
//封装outKV
outV.set(sum);
//写出outKV
context.write(key,outV);
}
}
🔆2.在job中设置: job.setCombinerClass(CustomCombiner.class)
// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑
job.setCombinerClass(WordCountReducer.class);
🔍4.代码执行结果
分组在发生在reduce阶段,决定了同一个reduce中哪些数据将组成一组去调用reduce方法处理。默认分组规则是:key相同的就会分为一组(前后两个key直接比较是否相等)。
需要注意的是,在reduce阶段进行分组之前,因为进行数据排序行为,因此排序+分组将会使得key一样的数据一定被分到同一组,一组去调用reduce方法处理。
此外,用户还可以自定义分组规则:
写类继承 WritableComparator,重写Compare方法。
只要Compare方法返回为0,MapReduce框架在分组的时候就会认为前后两个相等,分为一组。还需要在job对象中进行设置 才能让自己的重写分组类生效。
job.setGroupingComparatorClass(xxxx.class);
🔍1.需求:找出美国每个州state的确诊案例数最多的县county是哪一个。该问题也是俗称的TopN问题
🔍2.需求分析
自定义对象,在map阶段将“州state和累计确诊病例数cases”作为key输出,重写对象的排序规则,首先根据州的正序排序,如果州相等,按照确诊病例数cases倒序排序,发送到reduce。
在reduce端利用自定义分组规则,将州state相同的分为一组,然后取第一个即是最大值。
🔍3.代码实现
🔆1.自定义Bean
public class CovidBean implements WritableComparable<CovidBean> {
private String state;//州
private String county;//县
private long cases;//确诊病例
public CovidBean() {
}
public CovidBean(String state, String county, long cases) {
this.state = state;
this.county = county;
this.cases = cases;
}
public void set (String state, String county, long cases) {
this.state = state;
this.county = county;
this.cases = cases;
}
public String getState() {
return state;
}
public void setState(String state) {
this.state = state;
}
public String getCounty() {
return county;
}
public void setCounty(String county) {
this.county = county;
}
public long getCases() {
return cases;
}
public void setCases(long cases) {
this.cases = cases;
}
@Override
public String toString() {
return "CovidBean{" +
"state='" + state + '\'' +
", county='" + county + '\'' +
", cases=" + cases +
'}';
}
//todo 排序规则 根据州state正序进行排序 如果州相同 则根据确诊数量cases倒序排序
@Override
public int compareTo(CovidBean o) {
int result ;
int i = state.compareTo(o.getState());
if ( i > 0) {
result =1;
} else if (i <0 ) {
result = -1;
} else {
// 确诊病例数倒序排序
result = cases > o.getCases() ? -1 : 1;
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(state);
out.writeUTF(county);
out.writeLong(cases);
}
@Override
public void readFields(DataInput in) throws IOException {
this.state =in.readUTF();
this.county =in.readUTF();
this.cases =in.readLong();
}
}
🔆2.Mapper类
public class CovidTop1Mapper extends Mapper<LongWritable, Text, CovidBean, NullWritable> {
CovidBean outKey = new CovidBean();
NullWritable outValue = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
//封装数据: 州 县 确诊病例
outKey.set(fields[2],fields[1],Long.parseLong(fields[4]));
context.write(outKey,outValue);
}
}
🔆3.自定义分组
public class CovidGroupingComparator extends WritableComparator {
protected CovidGroupingComparator(){
super(CovidBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
CovidBean aBean = (CovidBean) a;
CovidBean bBean = (CovidBean) b;
return aBean.getState().compareTo(bBean.getState());
}
}
🔆4.Reducer类
public class CovidTop1Reducer extends Reducer<CovidBean, NullWritable,CovidBean,NullWritable> {
@Override
protected void reduce(CovidBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//不遍历迭代器,此时key就是分组中的第一个key 也就是该州确诊病例数最多的县对应的数据
context.write(key,NullWritable.get());
}
}
🔆5.Driver类
public class CovidTop1Driver {
public static void main(String[] args) throws Exception{
//配置文件对象
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf, CovidTop1Driver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidTop1Driver.class);
// 设置作业mapper reducer类
job.setMapperClass(CovidTop1Mapper.class);
job.setReducerClass(CovidTop1Reducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(CovidBean.class);
job.setMapOutputValueClass(NullWritable.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(CovidBean.class);
job.setOutputValueClass(NullWritable.class);
//todo 设置自定义分组
job.setGroupingComparatorClass(CovidGroupingComparator.class);
// 配置作业的输入数据路径
FileInputFormat.setInputPaths(job, new Path("E:\\inputCOVID"));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path("E:\\outputCOVID3"));
//判断输出路径是否存在 如果存在删除
Path outputPath = new Path("E:\\outputCOVID3");
outputPath.getFileSystem(conf).delete(outputPath,true);
// 提交作业并等待执行完成
boolean resultFlag = job.waitForCompletion(true);
//程序退出
System.exit(resultFlag ? 0 :1);
}
}
🔍4.代码执行结果
🔍1.需求:找出美国每个州state的确诊案例数最多的县county前10个。Top10问题
🔍2.需求分析
自定义对象,在map阶段将“州state和累计确诊病例数cases”作为key输出,重写对象的排序规则,首先根据州的正序排序,如果州相等,按照确诊病例数cases倒序排序,发送到reduce。
在reduce端利用自定义分组规则,将州state相同的分为一组,然后遍历取值,取出每组中的前10个即可。
为了验证验证结果方便,可以在输出的时候以cases作为value,实际上为空即可,value并不实际意义。
🔍3.代码实现
🔆1.自定义分区器
public class StatePartitioner extends Partitioner<Text, Text> {
//模拟美国各州数据字典 实际中可以从redis中快速查询 如果数据不大也可以使用数据集合保存
public static HashMap<String, Integer> stateMap = new HashMap<String, Integer>();
static{
stateMap.put("Alabama", 0);
stateMap.put("Arkansas", 1);
stateMap.put("California", 2);
stateMap.put("Florida", 3);
stateMap.put("Indiana", 4);
}
@Override
public int getPartition(Text key, Text value, int numPartitions) {
Integer code = stateMap.get(key.toString());
if (code != null) {
return code;
}
return 5;
}
}
🔆2.Mapper类
public class CovidTopNMapper extends Mapper<LongWritable, Text, CovidBean,LongWritable> {
CovidBean outKey = new CovidBean();
LongWritable outValue = new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
//封装数据: 州 县 确诊病例
outKey.set(fields[2],fields[1],Long.parseLong(fields[4]));
outValue.set(Long.parseLong(fields[4]));
context.write(outKey,outValue);
}
🔆3.Reducer类
public class CovidTopNReducer extends Reducer<CovidBean, LongWritable,CovidBean,LongWritable> {
@Override
protected void reduce(CovidBean key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
int num =0;
for (LongWritable value : values) {
if(num < 10 ){ //输出每个州最多的前10个
context.write(key,value);
num++;
}else{
return;
}
}
}
}
🔆4.Driver类
public class CovidTopNDriver {
public static void main(String[] args) throws Exception{
//配置文件对象
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf, CovidTopNDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidTopNDriver.class);
// 设置作业mapper reducer类
job.setMapperClass(CovidTopNMapper.class);
job.setReducerClass(CovidTopNReducer.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(CovidBean.class);
job.setMapOutputValueClass(LongWritable.class);
//设置作业reducer阶段输出key value数据类型 也就是程序最终输出数据类型
job.setOutputKeyClass(CovidBean.class);
job.setOutputValueClass(LongWritable.class);
//todo 设置自定义分组
job.setGroupingComparatorClass(CovidGroupingComparator.class);
// 配置作业的输入数据路径
FileInputFormat.setInputPaths(job, new Path("E:\\inputCOVID"));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path("E:\\outputCOVID4"));
//判断输出路径是否存在 如果存在删除
Path outputPath = new Path("E:\\outputCOVID4");
outputPath.getFileSystem(conf).delete(outputPath,true);
// 提交作业并等待执行完成
boolean resultFlag = job.waitForCompletion(true);
//程序退出
System.exit(resultFlag ? 0 :1);
}
}
🔍4.代码执行结果
1.《Hadoop权威指南》 2.MapReduce Tutorial 3.谷歌三大论文之一《Google MapReduce》 4.厦门大学林子雨《大数据技术原理与应用》第三版 5.极道科技关于分布式计算和并行计算的区别与联系的知乎问答 6.Apache Hadoop Main 3.1.3 API 7.Apache Hadoop官方文档 8.尚硅谷大数据Hadoop 3.x 9.黑马程序员Hadoop3.x全套教程 …
本篇文章到这里就结束了,如有不足请指出~