MapReduce编程模型

通过WordCount程序理解MapReduce编程模型

WordCount,名为单词统计,功能是统计文本文件中每个单词出现的次数。例如下图中,有两个文本(蓝色),其中一个含有两个单词(Hadoop和HDFS),另一个含有两个单词(Hadoop和MapReduce),通过统计计算,最终结果(橙色)中显示Hadoop单词出现2次,HDFS单词出现1次,MapReduce单词出现1次。

WordCount是最简单也是最体现MapReduce思想的程序之一,被成为MapReduce版的HelloWorld。

在部署好Hadoop环境后,一般执行该程序进行测试,在之前的《Hadoop伪分布式环境部署》一文中,我们提到如何运行它。该程序的完整代码可以在Hadoop安装包的src/examples目录下找到。

今天就带大家一步一步的写一下WordCount程序,帮助大家理解MapReduce编程模型。

1、新建一个类TestWordCount

publicclassTestWordCount {

}

2、一个标准的MapReduce分为3个部分:Map、Reduce、Driver

publicclassTestWordCount {

//1.Map

//2.Reduce

//3.Driver

}

3、首先,先写一个Map,定义一个名为TestMapper的内部类,该类继承Mapper

//1.Map

publicstaticclassTestMapperextendsMapper {

}

其中,Mapper类来自org.apache.hadoop.mapreduce包,同时需要定义泛型如下:

//1.Map

publicstaticclassTestMapperextendsMapper {

}

4、Mapper泛型的定义

表示数据输入分片之后的键值对进入map的类型,分片默认“按行”划分,此时KEYIN就是偏移量(第几行),VALUEIN就是那一行文本。

例如文本:

文本按行分为2片,每片作为键值对作为map的输入,每片数据均调用1次map方法:

因此,KEYIN和VALUEIN应该定义为IntWritable和Text类型(这些都是Map Reduce编程模型内部的类型,可以理解为Java里面的Integer和String类型)

//1.Map

publicstaticclassTestMapperextendsMapper {

}

那么KEYOUT和VALUEOUT呢?它们表示map的结果输出。

例如:经过map后,累计这一行每个单词出现的次数,即map的结果是、。

现在你知道KEYOUT和VALUEOUT类型应该是什么了吧?应该是Text和IntWritable。

//1.Map

publicstaticclassTestMapperextendsMapper {

}

5、在TestMapper类中,定义两个用于map输出的成员变量

publicclassTestMapperextendsMapper {

privateTextmapKey=newText();

privateIntWritablemapValue=newIntWritable(1);

……

}

6、重写Mapper类的map方法

//1.Map

publicstaticclassTestMapperextendsMapper {

@Override

protectedvoidmap(IntWritablekey, Textvalue, Mapper.Contextcontext)

throwsIOException, InterruptedException {

}

}

map方法用于实现具体业务逻辑:

读取文件中每一行

对每一行分割单词

对每个单词做次数统计,生成加1

//1.Map

publicstaticclassTestMapperextendsMapper {

privateTextmapKey=newText();

privateIntWritablemapValue=newIntWritable(1);

@Override

protectedvoidmap(IntWritablekey, Textvalue, Mapper.Contextcontext)

throwsIOException, InterruptedException {

Stringline=value.toString();

String[]strs=line.split(" ");

for(Stringstr:strs) {

mapKey.set(value);

context.write(mapKey,mapValue);

}

}

}

至此,Map过程结束,下面该Reduce过程。

7、写一个Reduce,定义一个名为TestReducer的内部类,该类继承Reducer

//2.Reduce

publicstaticclassTestReducerextendsReducer {

}

8、Reducer泛型的定义

Map的输出就是Reduce的输入,因此Reducer的KEYIN和VALUEIN分别是Text和IntWritable,Reducer的KEYOUT和VALUEOUT是不是也应该是Text和IntWritable呢?因为最终的统计结果每个单词的次数。

//2.Reduce

publicstaticclassTestReducerextendsReducer{

}

9、在TestReducer类中,定义用于reduce输出结果的成员变量

//2.Reduce

publicstaticclassTestReducerextendsReducer {

IntWritableoutValue=newIntWritable();

……

}

10、重写Reducer类的reduce方法

//2.Reduce

publicstaticclassTestReducerextendsReducer {

@Override

protectedvoidreduce(Text key,Iterable values,Reducer.Context context)throwsIOException, InterruptedException {

}

}

根据前面map结果:

对于Hadoop单词出现了2次:

Reduce的作用就是合并汇总,Reduce的Key就是Map输出结果的Key,Value应该是List(1,1),因此Reduce就是把相同key的value计数在一次(reduce方法里面的values是Iterable类型)。

reduce方法用于实现具体业务逻辑:

定义用于累加和的sum

将key的每个value累加到sum

sum累加后的结果是outValue

//2.Reduce

publicstaticclassTestReducerextendsReducer {

IntWritableoutValue=newIntWritable();

@Override

protectedvoidreduce(Textkey, Iterablevalues, Reducer.Contextcontext)throwsIOException, InterruptedException {

intsum= 0;

for(IntWritablevalue:values) {

sum+=value.get();

}

outValue.set(sum);

context.write(key,outValue)

}

}

至此,Map和Reduce过程结束,但还没有将Map和Reduce串联起来,需要一个Job。

public classTestWordCountextendsConfiguredimplementsTool{

……

publicintrun(String[]args)throwsException {

//读取Hadoop配置

Configurationconfiguration=this.getConf();

//创建job

Jobjob= Job.getInstance(configuration,this.getClass().getSimpleName());

// jar包的入口

job.setJarByClass(this.getClass());

//设置输入数据路径

PathinPath=newPath(args[0]);

FileInputFormat.addInputPath(job,inPath);

//设置输出数据路径

PathoutPath=newPath(args[1]);

FileOutputFormat.setOutputPath(job,outPath);

//指定Mapper类,map输出key和value类型

job.setMapperClass(TestMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

//指定Reducer类,reduce输出key和value类型

job.setReducerClass(TestReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

//提交任务

booleanisSuccess=job.waitForCompletion(true);

returnisSuccess ? 0 : 1;

}

}

12、定义Driver,在这里就是main方法

publicclassTestWordCount {

//3.Driver

publicstaticvoidmain(String[]args)throwsException {

Configurationconfiguration=newConfiguration();

args=newString[] {"hdfs://bigdata.com:8020/input",

"hdfs://bigdata.com:8020/output"};

intstatus= ToolRunner.run(configuration,newTestWordCount(),args);

System.exit(status);

}

}

至此,WordCount写了近100行代码。不如按照这个思路自己写一写代码,理解下MapReduce编程模型的思路。

本文来自企鹅号 - 大数据与智能硬件媒体

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏PPV课数据科学社区

【学习】七天搞定SAS(二):基本操作(判断、运算、基本函数)

SAS生成新变量 SAS支持基本的加减乘除,值得一提的是它的**代表指数,而不是^。 * Modify homegarden data set with ass...

4564
来自专栏码匠的流水账

聊聊storm的WindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IWindowedBolt.java

2224
来自专栏人工智能LeadAI

Spark常用的算子以及Scala函数总结

上海站 | 高性能计算之GPU CUDA培训 4月13-15日 ? 三天密集式学习 快速带你晋级 阅读全文 > 正文共11264个字,7张图,预计阅读时间28...

56812
来自专栏菩提树下的杨过

无限级分类(非递归算法/存储过程版/GUID主键)完整数据库示例_(2)插入记录

-- ======================================== -- Author:  <杨俊明,jimmy.yang@cntvs.c...

2009
来自专栏Albert陈凯

2018-04-17 Java的Collection集合类3分钟搞掂Set集合前言

3分钟搞掂Set集合 前言 声明,本文用的是jdk1.8 现在这篇主要讲Set集合的三个子类: HashSet集合 A:底层数据结构是哈希表(是一个元素为链...

2977
来自专栏码匠的流水账

聊聊storm的WindowedBolt

storm-2.0.0/storm-client/src/jvm/org/apache/storm/topology/IWindowedBolt.java

772
来自专栏跟着阿笨一起玩NET

.Net 2.0中使用扩展方法

1292
来自专栏ml

位运算的方法,大结

Title:       位操作基础篇之位操作全面总结 Author:     MoreWindows E-mail:      morewindows@126...

6078
来自专栏浪淘沙

spark求最受欢迎的老师的问题

1063
来自专栏前端说吧

正则表达式验证汇总

一、收集1  (转自https://blog.csdn.net/jumtre/article/details/13775351)

951

扫码关注云+社区

领取腾讯云代金券