MapReduce示例-WordCount

在之前的文章《Hadoop-本地模式搭建》小编提到了wordcount对单词的统计,当时使用Hadoop自带的hadoop-mapreduce-examples-2.7.3.jar的实现了统计,

命令如下:hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount /root/temp/data.txt /root/temp/dataout看了效果,现在小编通过自己编写示例来完成这个功能。

操作的环境:

伪分布式环境:如环境未搭建,请返回文章《伪分布式模式搭建》查看。

需要的jar包

/root/training/hadoop-2.7.3/share/hadoop/common

/root/training/hadoop-2.7.3/share/hadoop/common/lib

/root/training/hadoop-2.7.3/share/hadoop/mapreduce

/root/training/hadoop-2.7.3/share/hadoop/mapreduce/lib

(/root/training/hadoop-2.7.3 此处是小编在搭建环境中约定的文件路径)

实现思路

演示程序中的data.txt需要自己创建,其中的内容为:

I love Beijing

I love China

Beijing is the capital of China

MapReduce的wordcount功能主要分两个功能:一个是分词,一个是统计,分别对应这个Mapper阶段和Reduce阶段

1.在Mapper阶段,将data.txt文件读入,并对一行行的数据进行读取,分词

对应着下面的实现类:WordCountMapper

2.在Reduce阶段,将Mapper的输出作为输入,进行单词的统计,并排序。

对应着项目的实现类:WordCountReducer

3.在两个阶段需要一个主类,实现job

Mapper类的实现

package demo.wc;

importjava.io.IOException;

importorg.apache.hadoop.mapreduce.Mapper;

//public classWordCountMapper extends Mapper {

public classWordCountMapper extends Mapper {

/*

* map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次我们自定义的map方法

* map task在调用map方法时,传递的参数:

* 一行的起始偏移量LongWritable作为key

* 一行的文本内容Text作为value1

*/

@Override

protectedvoid map(LongWritable key1, Text value1, Context context)

throwsIOException, InterruptedException {

/**

* context 代表Mapper的上下文

* 上文 HDFS

* 下文 Reducer

*/

//取出数据 I Love Beijing

Stringdata = value1.toString();

//分词

String[]words = data.split(" ");

for(Stringword: words) {

//k2为单词, v2:为计数

context.write(newText(word), new IntWritable(1));

}

}

}

Reduce类的实现

package demo.wc;

importjava.io.IOException;

//public classWordCountReducer extends Reducer {

public classWordCountReducer extends Reducer {

/*

* reduce方法提供给reduce task进程来调用

*

* reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,聚合的机制是相同key的kv对聚合为一组

* 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法

* 调用时传递的参数:

*k3:一组kv中的key

*v3:一组kv中所有value的迭代器

*/

@Override

protectedvoid reduce(Text k3, Iterable v3,

Contextcontext) throws IOException, InterruptedException {

/**

* context 待办reduce的上下文

* 上文: Mapper

* 下文: HDFS

*/

//对V3进行求和

inttotal = 0;

for(IntWritablev: v3) {

total+= v.get();

}

//输出 K4单词 V4频率

context.write(k3,new IntWritable(total));

}

}

job主类的实现

package demo.wc;

public classWordCountMain {

publicstatic void main(String[] args) throws Exception {

//创建Job

Jobjob = Job.getInstance(new Configuration());

//指定任务的入口

job.setJarByClass(WordCountMain.class);

//指任务的map和输出的数据类型 k2 v2

job.setMapperClass(WordCountMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(IntWritable.class);

//指定任务的reduce和输出的数据类型 k4 v4

job.setReducerClass(WordCountReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

//指定输入的路径(map) 输出的路径(reduce)

FileInputFormat.setInputPaths(job,new Path(args[0]));

FileOutputFormat.setOutputPath(job,new Path(args[1]));

//执行任务

job.waitForCompletion(true);

}

}

打包

打包内容其实很简单,只是罗列下注意的地方点就好,在这个界面Main class要选择入口类。

运行

要将打包的文件上传到自己定义的目录下(小编放在/root/tmp目录下)

[root@bigdata11temp]# hadoop jar wc.jar /input/data.txt /output/day1215/wc

查看效果:

此篇文章主要是通过编写代码的形式来完成WordCount功能,其中将如何实现做了简单的介绍,如果大家有疑问,可以在文章后面留言一起讨论学习。

在公众号回复 3:获取最新的大数据学习路线,当前获取的大数据学习路线,还在完善中。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180325G0Y13M00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券