通过WordCount程序理解MapReduce编程模型
WordCount,名为单词统计,功能是统计文本文件中每个单词出现的次数。例如下图中,有两个文本(蓝色),其中一个含有两个单词(Hadoop和HDFS),另一个含有两个单词(Hadoop和MapReduce),通过统计计算,最终结果(橙色)中显示Hadoop单词出现2次,HDFS单词出现1次,MapReduce单词出现1次。
WordCount是最简单也是最体现MapReduce思想的程序之一,被成为MapReduce版的HelloWorld。
在部署好Hadoop环境后,一般执行该程序进行测试,在之前的《Hadoop伪分布式环境部署》一文中,我们提到如何运行它。该程序的完整代码可以在Hadoop安装包的src/examples目录下找到。
今天就带大家一步一步的写一下WordCount程序,帮助大家理解MapReduce编程模型。
1、新建一个类TestWordCount
publicclassTestWordCount {
}
2、一个标准的MapReduce分为3个部分:Map、Reduce、Driver
publicclassTestWordCount {
//1.Map
//2.Reduce
//3.Driver
}
3、首先,先写一个Map,定义一个名为TestMapper的内部类,该类继承Mapper
//1.Map
publicstaticclassTestMapperextendsMapper {
}
其中,Mapper类来自org.apache.hadoop.mapreduce包,同时需要定义泛型如下:
//1.Map
publicstaticclassTestMapperextendsMapper {
}
4、Mapper泛型的定义
表示数据输入分片之后的键值对进入map的类型,分片默认“按行”划分,此时KEYIN就是偏移量(第几行),VALUEIN就是那一行文本。
例如文本:
文本按行分为2片,每片作为键值对作为map的输入,每片数据均调用1次map方法:
因此,KEYIN和VALUEIN应该定义为IntWritable和Text类型(这些都是Map Reduce编程模型内部的类型,可以理解为Java里面的Integer和String类型)
//1.Map
publicstaticclassTestMapperextendsMapper {
}
那么KEYOUT和VALUEOUT呢?它们表示map的结果输出。
例如:经过map后,累计这一行每个单词出现的次数,即map的结果是、。
现在你知道KEYOUT和VALUEOUT类型应该是什么了吧?应该是Text和IntWritable。
//1.Map
publicstaticclassTestMapperextendsMapper {
}
5、在TestMapper类中,定义两个用于map输出的成员变量
publicclassTestMapperextendsMapper {
privateTextmapKey=newText();
privateIntWritablemapValue=newIntWritable(1);
……
}
6、重写Mapper类的map方法
//1.Map
publicstaticclassTestMapperextendsMapper {
@Override
protectedvoidmap(IntWritablekey, Textvalue, Mapper.Contextcontext)
throwsIOException, InterruptedException {
}
}
map方法用于实现具体业务逻辑:
读取文件中每一行
对每一行分割单词
对每个单词做次数统计,生成加1
//1.Map
publicstaticclassTestMapperextendsMapper {
privateTextmapKey=newText();
privateIntWritablemapValue=newIntWritable(1);
@Override
protectedvoidmap(IntWritablekey, Textvalue, Mapper.Contextcontext)
throwsIOException, InterruptedException {
Stringline=value.toString();
String[]strs=line.split(" ");
for(Stringstr:strs) {
mapKey.set(value);
context.write(mapKey,mapValue);
}
}
}
至此,Map过程结束,下面该Reduce过程。
7、写一个Reduce,定义一个名为TestReducer的内部类,该类继承Reducer
//2.Reduce
publicstaticclassTestReducerextendsReducer {
}
8、Reducer泛型的定义
Map的输出就是Reduce的输入,因此Reducer的KEYIN和VALUEIN分别是Text和IntWritable,Reducer的KEYOUT和VALUEOUT是不是也应该是Text和IntWritable呢?因为最终的统计结果每个单词的次数。
//2.Reduce
publicstaticclassTestReducerextendsReducer{
}
9、在TestReducer类中,定义用于reduce输出结果的成员变量
//2.Reduce
publicstaticclassTestReducerextendsReducer {
IntWritableoutValue=newIntWritable();
……
}
10、重写Reducer类的reduce方法
//2.Reduce
publicstaticclassTestReducerextendsReducer {
@Override
protectedvoidreduce(Text key,Iterable values,Reducer.Context context)throwsIOException, InterruptedException {
}
}
根据前面map结果:
对于Hadoop单词出现了2次:
Reduce的作用就是合并汇总,Reduce的Key就是Map输出结果的Key,Value应该是List(1,1),因此Reduce就是把相同key的value计数在一次(reduce方法里面的values是Iterable类型)。
reduce方法用于实现具体业务逻辑:
定义用于累加和的sum
将key的每个value累加到sum
sum累加后的结果是outValue
//2.Reduce
publicstaticclassTestReducerextendsReducer {
IntWritableoutValue=newIntWritable();
@Override
protectedvoidreduce(Textkey, Iterablevalues, Reducer.Contextcontext)throwsIOException, InterruptedException {
intsum= 0;
for(IntWritablevalue:values) {
sum+=value.get();
}
outValue.set(sum);
context.write(key,outValue)
}
}
至此,Map和Reduce过程结束,但还没有将Map和Reduce串联起来,需要一个Job。
public classTestWordCountextendsConfiguredimplementsTool{
……
publicintrun(String[]args)throwsException {
//读取Hadoop配置
Configurationconfiguration=this.getConf();
//创建job
Jobjob= Job.getInstance(configuration,this.getClass().getSimpleName());
// jar包的入口
job.setJarByClass(this.getClass());
//设置输入数据路径
PathinPath=newPath(args[0]);
FileInputFormat.addInputPath(job,inPath);
//设置输出数据路径
PathoutPath=newPath(args[1]);
FileOutputFormat.setOutputPath(job,outPath);
//指定Mapper类,map输出key和value类型
job.setMapperClass(TestMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定Reducer类,reduce输出key和value类型
job.setReducerClass(TestReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//提交任务
booleanisSuccess=job.waitForCompletion(true);
returnisSuccess ? 0 : 1;
}
}
12、定义Driver,在这里就是main方法
publicclassTestWordCount {
//3.Driver
publicstaticvoidmain(String[]args)throwsException {
Configurationconfiguration=newConfiguration();
args=newString[] {"hdfs://bigdata.com:8020/input",
"hdfs://bigdata.com:8020/output"};
intstatus= ToolRunner.run(configuration,newTestWordCount(),args);
System.exit(status);
}
}
至此,WordCount写了近100行代码。不如按照这个思路自己写一写代码,理解下MapReduce编程模型的思路。
领取专属 10元无门槛券
私享最新 技术干货