MapReduce编程模型

通过WordCount程序理解MapReduce编程模型

WordCount,名为单词统计,功能是统计文本文件中每个单词出现的次数。例如下图中,有两个文本(蓝色),其中一个含有两个单词(Hadoop和HDFS),另一个含有两个单词(Hadoop和MapReduce),通过统计计算,最终结果(橙色)中显示Hadoop单词出现2次,HDFS单词出现1次,MapReduce单词出现1次。

WordCount是最简单也是最体现MapReduce思想的程序之一,被成为MapReduce版的HelloWorld。

在部署好Hadoop环境后,一般执行该程序进行测试,在之前的《Hadoop伪分布式环境部署》一文中,我们提到如何运行它。该程序的完整代码可以在Hadoop安装包的src/examples目录下找到。

今天就带大家一步一步的写一下WordCount程序,帮助大家理解MapReduce编程模型。

1、新建一个类TestWordCount

publicclassTestWordCount {

}

2、一个标准的MapReduce分为3个部分:Map、Reduce、Driver

publicclassTestWordCount {

//1.Map

//2.Reduce

//3.Driver

}

3、首先,先写一个Map,定义一个名为TestMapper的内部类,该类继承Mapper

//1.Map

publicstaticclassTestMapperextendsMapper {

}

其中,Mapper类来自org.apache.hadoop.mapreduce包,同时需要定义泛型如下:

//1.Map

publicstaticclassTestMapperextendsMapper {

}

4、Mapper泛型的定义

表示数据输入分片之后的键值对进入map的类型,分片默认“按行”划分,此时KEYIN就是偏移量(第几行),VALUEIN就是那一行文本。

例如文本:

文本按行分为2片,每片作为键值对作为map的输入,每片数据均调用1次map方法:

因此,KEYIN和VALUEIN应该定义为IntWritable和Text类型(这些都是Map Reduce编程模型内部的类型,可以理解为Java里面的Integer和String类型)

//1.Map

publicstaticclassTestMapperextendsMapper {

}

那么KEYOUT和VALUEOUT呢?它们表示map的结果输出。

例如:经过map后,累计这一行每个单词出现的次数,即map的结果是、。

现在你知道KEYOUT和VALUEOUT类型应该是什么了吧?应该是Text和IntWritable。

//1.Map

publicstaticclassTestMapperextendsMapper {

}

5、在TestMapper类中,定义两个用于map输出的成员变量

publicclassTestMapperextendsMapper {

privateTextmapKey=newText();

privateIntWritablemapValue=newIntWritable(1);

……

}

6、重写Mapper类的map方法

//1.Map

publicstaticclassTestMapperextendsMapper {

@Override

protectedvoidmap(IntWritablekey, Textvalue, Mapper.Contextcontext)

throwsIOException, InterruptedException {

}

}

map方法用于实现具体业务逻辑:

读取文件中每一行

对每一行分割单词

对每个单词做次数统计,生成加1

//1.Map

publicstaticclassTestMapperextendsMapper {

privateTextmapKey=newText();

privateIntWritablemapValue=newIntWritable(1);

@Override

protectedvoidmap(IntWritablekey, Textvalue, Mapper.Contextcontext)

throwsIOException, InterruptedException {

Stringline=value.toString();

String[]strs=line.split(" ");

for(Stringstr:strs) {

mapKey.set(value);

context.write(mapKey,mapValue);

}

}

}

至此,Map过程结束,下面该Reduce过程。

7、写一个Reduce,定义一个名为TestReducer的内部类,该类继承Reducer

//2.Reduce

publicstaticclassTestReducerextendsReducer {

}

8、Reducer泛型的定义

Map的输出就是Reduce的输入,因此Reducer的KEYIN和VALUEIN分别是Text和IntWritable,Reducer的KEYOUT和VALUEOUT是不是也应该是Text和IntWritable呢?因为最终的统计结果每个单词的次数。

//2.Reduce

publicstaticclassTestReducerextendsReducer{

}

9、在TestReducer类中,定义用于reduce输出结果的成员变量

//2.Reduce

publicstaticclassTestReducerextendsReducer {

IntWritableoutValue=newIntWritable();

……

}

10、重写Reducer类的reduce方法

//2.Reduce

publicstaticclassTestReducerextendsReducer {

@Override

protectedvoidreduce(Text key,Iterable values,Reducer.Context context)throwsIOException, InterruptedException {

}

}

根据前面map结果:

对于Hadoop单词出现了2次:

Reduce的作用就是合并汇总,Reduce的Key就是Map输出结果的Key,Value应该是List(1,1),因此Reduce就是把相同key的value计数在一次(reduce方法里面的values是Iterable类型)。

reduce方法用于实现具体业务逻辑:

定义用于累加和的sum

将key的每个value累加到sum

sum累加后的结果是outValue

//2.Reduce

publicstaticclassTestReducerextendsReducer {

IntWritableoutValue=newIntWritable();

@Override

protectedvoidreduce(Textkey, Iterablevalues, Reducer.Contextcontext)throwsIOException, InterruptedException {

intsum= 0;

for(IntWritablevalue:values) {

sum+=value.get();

}

outValue.set(sum);

context.write(key,outValue)

}

}

至此,Map和Reduce过程结束,但还没有将Map和Reduce串联起来,需要一个Job。

public classTestWordCountextendsConfiguredimplementsTool{

……

publicintrun(String[]args)throwsException {

//读取Hadoop配置

Configurationconfiguration=this.getConf();

//创建job

Jobjob= Job.getInstance(configuration,this.getClass().getSimpleName());

// jar包的入口

job.setJarByClass(this.getClass());

//设置输入数据路径

PathinPath=newPath(args[0]);

FileInputFormat.addInputPath(job,inPath);

//设置输出数据路径

PathoutPath=newPath(args[1]);

FileOutputFormat.setOutputPath(job,outPath);

//指定Mapper类,map输出key和value类型

job.setMapperClass(TestMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

//指定Reducer类,reduce输出key和value类型

job.setReducerClass(TestReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

//提交任务

booleanisSuccess=job.waitForCompletion(true);

returnisSuccess ? 0 : 1;

}

}

12、定义Driver,在这里就是main方法

publicclassTestWordCount {

//3.Driver

publicstaticvoidmain(String[]args)throwsException {

Configurationconfiguration=newConfiguration();

args=newString[] {"hdfs://bigdata.com:8020/input",

"hdfs://bigdata.com:8020/output"};

intstatus= ToolRunner.run(configuration,newTestWordCount(),args);

System.exit(status);

}

}

至此,WordCount写了近100行代码。不如按照这个思路自己写一写代码,理解下MapReduce编程模型的思路。

  • 发表于:
  • 原文链接:http://kuaibao.qq.com/s/20171218G0UJ7400?refer=cp_1026

扫码关注云+社区