那么我们先来看看这个项目是干什么的。
假设这里有一份文本,里面写了很多单词,当然要有分隔的输入,如下图:
我们需要统计文本中每个单词出现的次数,我们看看怎么完成的。
1
那么我们先来简单地看看项目逻辑,他到底是怎么完成简单的单词计数的。看图。
很显然,首先我们拿到文本不是立刻就送去MapReduce中处理,而是先通过一个叫做TextInputFormat的类,处理好原有文本的数据,用偏移量逐个表识。然后再传入map中处理。然而map只是对单词进行简单的编号(同时编上1),在再进入reduce类之前,先通过迭代器(图中黑色部分,等会会补上),把map的数据简单的处理,如上图的hadoop为例,迭代器的内容就是<hadoop,iterator(1,1)>,那么在reduce类中,我们只要统计好迭代器中的数值就好了。
2
那我们开始写自己的map类。上代码。
//1、自己的map类
//继承Mapper类,<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 输入的key,输入的value,输出的key,输出的value
public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
//创建一个IntWritable类型的对象,给定值为1
IntWritable i = new IntWritable(1);
Text keystr = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String line = value.toString();
//传入每一个map方法的key和value做打印
System.out.println("key : "+key.get()+"--------- value : "+line);
String [] strs = line.split(" ");
for (String str : strs) {
//每一次循环遍历到一个单词就要输出到下一个步骤
keystr.set(str);
System.out.println("map的输出:key : ("+str+",1)");
context.write(keystr, i);
}
/* StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
keystr.set(itr.nextToken());
context.write(keystr, i);
}*/
}
}
这里要给大家普及一下Hadoop的数据类型:
1、IntWritable
2、ByteWritable
3、FloatWritable
4、LongWritable
5、DoubleWritable
6、NullWritable
7、Text (等于常用的String)
这些就不解释了,看看名字就知道了吧。
来看看代码。首先这个MyMapper类要继承Mapper类,这个类后面的参数是什么呢?<LongWritable, Text, Text, IntWritable>,为什么要这样写呢。
实际上这四个参数分别对应<KEYIN, VALUEIN, KEYOUT, VALUEOUT>,也就是输入的key,输入的value,输出的key,输出的value。
很简单,我们来看看上面的代码逻辑,在传入map类之前是不是已经经过了一个叫做TextInputFormat的东西,他把我们的文本变成了偏移量+单词,这就很好的对应了,这里的偏移量(LongWritable)作为KETIN,单词(Text)作为VALUEIN。那么输出到reduce的结果是怎样的呢?是单词+编号(1),因此单词(Text)作为KETOUT,编号(IntWritable)作为VALUEOUT。
然后我们使用的是Mapper类的map方法。这里要注意以下,使用Context来把map中的结果传给reduce,但是要注意contect的使用方法。
我们先把字符串切割成我们需要的数组,就是把空格给切割掉,然后循环遍历数组,每次循环遍历到一个单词就要输出到下个步骤,也就是使用刚才讲的context,但是这里要注意,context是不接受String类型的,因此这里要提前创建好Text类的对象,在每次循环的时候使用set方法构建Text,作为context的参数,同样,Int类型也要使用IntWritable。提一句,这里的循环遍历有比较多的方式,这里的话采用的是foreach的方式,当然效率不高,也可以使用他们已经写好的方式稍微修改也可以,这边方便理解就使用foreach了。
3
好了,我们已经写完了MyMapper,现在开始写MyReducer类,上代码。
//2自己的reduce类
// reduce类的输入,其实就是map类中map方法的输出 输入key 输入value 输出key 输出value
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable countwritable = new IntWritable();
@Override
//Map类的map方法的数据输入到Reduce类的group方法中,得到<text,it(1,1)>,再将这个数据输入reduce类到reduce方法中
protected void reduce(Text inputkey, Iterable<IntWritable> inputvalue,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//得到了key
String key = inputkey.toString();
//迭代Iterable,把每一个值相加
int count = 0;
//循环遍历迭代器中的所有值,做相加
for (IntWritable intWritable : inputvalue) {
count = count + intWritable.get();
}
//把值设置到IntWritable,等待输出
countwritable.set(count);
System.out.println("reduce输出结果:key : "+key+" , "+ count);
context.write(inputkey, countwritable);
}
}
MyReducer类参数还是一样的,我们要得到的结果是:“hadoop,2”类似与这种形式,因此,KEYOUT:Text,VALUEOUT:IntWritable,KETIN:Text,VALUEIN:IntWriatble。
这边就实现了reduce方法。好,你一看就会觉得很好奇了,为什么reduce方法的参数跟MyReducer类的不一样呢?传进来的不应该是Text类型和IntWritable类型吗,为什么变成了一个Iterable<IntWritable>类型的参数呢?那么请看回去 1 中的图解,黑的那块到底是什么呢?
没错,iterable就是一个迭代器!那么我现在重新补充一下逻辑图,请看:
这里就是把Map类中的map方法的数据输入到Reduce类中的group方法中,得到<text,it(1,1)>,再将这个数据输入到Reduce类的reduce方法中。
那么接下来reduce类,只要把iterable中的每个值进行一个累加不就得到了单词的数量了吗,然后再把这个值做成一个IntWritable类型输出出去不就行了嘛!!太聪明了。
然而实际上逻辑图还没有完整,在map类中还存在一个步骤,慢慢地会讲到。
4
运行类:run方法和main方法(调用run方法)。
//3运行类,run方法,在测试的时候使用main函数,调用这个类的run方法来运行
/**
*
* @param args 参数是要接受main方法得到的参数,在run中使用
* @return
* @throws Exception
*/
public int run(String[] args) throws Exception {
//hadoop的配置的上下文!
Configuration configuration = new Configuration();
//通过上下文,构建一个job实例,并且传入任务名称,单例!
Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
//这参数必须添加,否则本地运行没有问题,服务器上运行会报错
job.setJarByClass(MyMapReduce.class);
//设置任务从哪里读取数据?
//调用这个方法的时候,要往args中传入参数,第一个位置上要传入从哪里读数据
Path inputpath = new Path(args[0]);
FileInputFormat.addInputPath(job, inputpath);
//设置任务结果数据保存到哪里?
//调用这个方法的时候,要往args中传入参数,第二个位置上要传入结果数据保存到哪里
Path outputpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputpath);
//设置mapper类的参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setNumReduceTasks(2);
//设置reduce类的参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// submit job -> YARN
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0 : 1;
}
public static void main(String[] args) {
args= new String[]{
"hdfs://192.168.6.129:8020/1.txt",
"hdfs://192.168.6.129:8020/output"
};
MyMapReduce mr = new MyMapReduce();
try {
int success = -1;
success = mr.run(args);
System.out.println("success:"+success);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
首先来看看run方法的参数,为什么是String [],args呢?这里的run方法的参数是要接收main方法的参数在run中使用。其他的就直接看代码即可。
然后是main方法,这里传入参数:输入路径和输出路径。最后成功运行即可。
5
最最最最后,本地跑完以后直接打jar包,传入服务器跑,这里要注意在打包之前要把main方法中的参数给注释掉,因为在服务器上面是直接传参的。
源代码的话已经上传到我的github上,在文件夹:WorldCount中,欢迎大家围观。
程序猿
改变世界