1.如何用MapReduce求平均成绩——WordCount的加强版

Map：      指定格式的数据集（如"张三　　60"）——输入数据

执行每条记录的分割操作以key-value写入上下文context中——执行功能

得到指定键值对类型的输出（如"（new Text（张三），new　IntWritable（60））"）——输出结果

Reduce:   map的输出——输入数据

求出单个个体的总成绩后再除以该个体课程数目——执行功能

得到指定键值对类型的输入——输出结果

鉴于上面的map和reduce过程，我们可以得到如下的代码：

``` 1 public class Test1214 {
2
3     public static class MapperClass extends Mapper<LongWritable, Text, Text, IntWritable> {
4         public void map(LongWritable key, Text value, Context context){
5             String line = value.toString();
6             System.out.println("该行数据为：" + line);
7             StringTokenizer token = new StringTokenizer(line,"\t");
8             String nameT = token.nextToken();
9             int score = Integer.parseInt(token.nextToken());
10             Text name = new Text(nameT);
11             try {
12                 context.write(name, new IntWritable(score));
13             } catch (IOException e) {
14                 e.printStackTrace();
15             } catch (InterruptedException e) {
16                 e.printStackTrace();
17             }
18         }
19     }
20
21     public static class ReducerClass extends Reducer<Text, IntWritable, Text, IntWritable>{
22         public void reduce(Text key, Iterable<IntWritable> value, Context context){
23             int sum = 0;
24             int count =0;
25             for(IntWritable score : value){
26                 sum += score.get();
27                 count++;
28                 System.out.println("第" + count + "个数值为：" + score.get());
29             }
30             IntWritable avg = new IntWritable(sum/count);
31             try {
32                 context.write(key, avg);
33             } catch (IOException e) {
34                 e.printStackTrace();
35             } catch (InterruptedException e) {
36                 e.printStackTrace();
37             }
38         }
39     }
40     /**
41      * @param args
42      * @throws IOException
43      * @throws ClassNotFoundException
44      * @throws InterruptedException
45      */
46     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
47
48         Configuration conf = new Configuration();
49         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
50         if (otherArgs.length != 2) {
51           System.err.println("Usage: wordcount <in> <out>");
52           System.exit(2);
53         }
54         Job job = new Job(conf, "Test1214");
55
56         job.setJarByClass(Test1214.class);
57         job.setMapperClass(MapperClass.class);
58         job.setCombinerClass(ReducerClass.class);
59         job.setReducerClass(ReducerClass.class);
60         job.setOutputKeyClass(Text.class);
61         job.setOutputValueClass(IntWritable.class);
62
65         System.exit(job.waitForCompletion(true) ? 0 : 1);
66         System.out.println("end");
67     }
68
69 }```

NameScore1.txt：

```A	55
B	65
C	44
D	87
E	66
F	90
G	70
H	59
I	61
J	58
K	40
A	45
B	62
C	64
D	77
E	36
F	50
G	80
H	69
I	71
J	70
K	49
A	51
B	64
C	74
D	37
E	76
F	80
G	50
H	51
I	81
J	68
K	80
A	85
B	55
C	49
D	67
E	69
F	50
G	80
H	79
I	81
J	68
K	80
A	35
B	55
C	40
D	47
E	60
F	72
G	76
H	79
I	68
J	78
K	50
A	65
B	45
C	74
D	57
E	56
F	50
G	60
H	59
I	61
J	58
K	60
A	85
B	45
C	74
D	67
E	86
F	70
G	50
H	79
I	81
J	78
K	60
A	50
B	69
C	40
D	89
E	69
F	95
G	75
H	59
I	60
J	59
K	45```

NameScore2.txt：

```A	65
B	75
C	64
D	67
E	86
F	70
G	90
H	79
I	81
J	78
K	60
A	65
B	82
C	84
D	97
E	66
F	70
G	80
H	89
I	91
J	90
K	69
A	71
B	84
C	94
D	67
E	96
F	80
G	70
H	71
I	81
J	98
K	80
A	85
B	75
C	69
D	87
E	89
F	80
G	70
H	99
I	81
J	88
K	60
A	65
B	75
C	60
D	67
E	80
F	92
G	76
H	79
I	68
J	78
K	70
A	85
B	85
C	74
D	87
E	76
F	60
G	60
H	79
I	81
J	78
K	80
A	85
B	65
C	74
D	67
E	86
F	70
G	70
H	79
I	81
J	78
K	60
A	70
B	69
C	60
D	89
E	69
F	95
G	75
H	59
I	60
J	79
K	65```

```14/12/14 17:05:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/14 17:05:27 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/12/14 17:05:27 INFO input.FileInputFormat: Total input paths to process : 2
14/12/14 17:05:27 INFO mapred.JobClient: Running job: job_local_0001
14/12/14 17:05:27 INFO input.FileInputFormat: Total input paths to process : 2
14/12/14 17:05:27 INFO mapred.MapTask: io.sort.mb = 100
14/12/14 17:05:28 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/14 17:05:28 INFO mapred.MapTask: record buffer = 262144/327680

14/12/14 17:05:28 INFO mapred.MapTask: Starting flush of map output

14/12/14 17:05:28 INFO mapred.MapTask: Finished spill 0
14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner:
14/12/14 17:05:28 INFO mapred.MapTask: io.sort.mb = 100
14/12/14 17:05:28 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/14 17:05:28 INFO mapred.MapTask: record buffer = 262144/327680

14/12/14 17:05:28 INFO mapred.MapTask: Starting flush of map output

14/12/14 17:05:28 INFO mapred.MapTask: Finished spill 0
14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner:
14/12/14 17:05:28 INFO mapred.LocalJobRunner:
14/12/14 17:05:28 INFO mapred.Merger: Merging 2 sorted segments
14/12/14 17:05:28 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 180 bytes
14/12/14 17:05:28 INFO mapred.LocalJobRunner:

14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner:
14/12/14 17:05:28 INFO mapred.LocalJobRunner: reduce > reduce
14/12/14 17:05:28 INFO mapred.JobClient: map 100% reduce 100%
14/12/14 17:05:28 INFO mapred.JobClient: Job complete: job_local_0001
14/12/14 17:05:28 INFO mapred.JobClient: Counters: 14
14/12/14 17:05:28 INFO mapred.JobClient: FileSystemCounters
14/12/14 17:05:28 INFO mapred.JobClient: FILE_BYTES_WRITTEN=103046
14/12/14 17:05:28 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=55
14/12/14 17:05:28 INFO mapred.JobClient: Map-Reduce Framework
14/12/14 17:05:28 INFO mapred.JobClient: Reduce input groups=11
14/12/14 17:05:28 INFO mapred.JobClient: Combine output records=22
14/12/14 17:05:28 INFO mapred.JobClient: Map input records=176
14/12/14 17:05:28 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/14 17:05:28 INFO mapred.JobClient: Reduce output records=11
14/12/14 17:05:28 INFO mapred.JobClient: Spilled Records=44
14/12/14 17:05:28 INFO mapred.JobClient: Map output bytes=1056
14/12/14 17:05:28 INFO mapred.JobClient: Combine input records=176
14/12/14 17:05:28 INFO mapred.JobClient: Map output records=176
14/12/14 17:05:28 INFO mapred.JobClient: Reduce input records=22

A 65
B 66
C 64
D 72
E 72
F 73
G 70
H 72
I 74
J 75
K 63```

为了更清晰从将要执行的控制台中看到map和reduce过程的执行都进行了那些操作，我们在其中打印了相关信息，这里有自己的两点疑惑需要拿出来闹闹：

（1）.这里我写的程序和书中不一样，没有增加StringTokenizer token = new StringTokenizer（line，"line"）这行，事实上我加上去后会出现错误，我的理解是，因为默认格式是TextInputFormat即已经将文件中的文本按照行标示进行分割，即输入给map方法的已经是以一行为单位的记录，所以这里不需要以“\n”进行分割了。书中的做法应该是假定将整个文件拿过来，统一处理，但事实上这里默认的TextInputFormat已经完成了前期工作。（如果执迷不悟这样处理的话，距离来说NameScore1.txt中第一行是“A     55”整个以“\n”分割后就是一个整体了，再以“\t”就无法分割了。）

（2）.从执行过程打印的信息，起初让我有些疑惑，因为从信息来看，似乎是：NameScore1.txt被分割并以每行记录进入map过程，当执行到该文件的最后一行记录时，从打印信息我们似乎看到的是紧接着就去执行reduce过程了，后面的NameScore2.txt也是如此，当两个文件都分别执行了map和reduce时似乎又执行了一次reduce操作。那么事实是不是如此，如果真是这样，这与先前所看到的理论中提到当map执行完后再执行reduce是否有冲突。

通过查看代码我们发现

job.setMapperClass(MapperClass.class); 　　job.setCombinerClass(ReducerClass.class); 　　job.setReducerClass(ReducerClass.class);

是的，没错，在这里我们发现了端倪，其真正执行过程是：先执行map，这就是过程信息中打印了每条成绩记录，后面执行的reduce其实是介于map和reduce之间的combiner操作，那么这个Combiner类又为何物，通过神奇的API我们可以发现Combine其实就是一次reduce过程，而且这里明确写了combiner和reduce过程都是使用ReducerClass类，从而就出现了这里为什么对于单个文件都会先执行map然后在reduce，再后来是两个map执行后，才执行的真正的reduce。

2.去重——阉割版的WordCount

对于去重来说，我们不在乎一个元素到底出现了几次，只要知道这个元素确实出现了，并能够再最后显示出来就行了，通过map和combiner，我们最终得到的key-value对中的key都是不一样的，也就是说在完成合并的同时就是我们所需要的去重操作（是不是有点绕）。最终reduce输出的就是具有唯一性的去重的元素集合。我们还是按照理清map和reduce的思路来看待这个去重问题：

map：　　数据中的一行记录如"（安徽  jack）"——输入数据

直接以key-value的方式写入上下文对象context（这里的value并不是我们关心的对象，可以为空）——执行功能

得到指定键值对类型的输出如"（new Text（安徽），new　Text（""））"——输出结果

reduce： map的输出——输入数据

直接以key-value的方式写入上下文对象context（同样，value并不是我们关心的对象）——执行功能

得到指定键值对类型的输入——输出结果

鉴于以上对于map和reduce的理解，代码如下：

``` 1 package org.apache.mapreduce;
2
3 import java.io.IOException;
4 import java.util.Collection;
5 import java.util.Iterator;
6 import java.util.StringTokenizer;
7
18 import org.apache.mapreduce.Test1123.MapperClass;
19 import org.apache.mapreduce.Test1123.ReducerClass;
20
21 public class Test1215 {
22
23     public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {
24         public void map(LongWritable key, Text value, Context context){
25
26             try {
27                 context.write(value, new Text(""));
28                 System.out.println("value:" + value);
29             } catch (IOException e) {
30                 e.printStackTrace();
31             } catch (InterruptedException e) {
32                 e.printStackTrace();
33             }
34         }
35     }
36
37     public static class ReducerClass extends Reducer<Text, Text, Text, Text>{
38         public void reduce(Text key, Iterable<Text> value, Context context){
39
40             try {
41                 context.write(key, new Text(""));
42                 System.out.println("key:"+key);
43             } catch (IOException e) {
44                 e.printStackTrace();
45             } catch (InterruptedException e) {
46                 e.printStackTrace();
47             }
48         }
49     }
50     /**
51      * @param args
52      * @throws IOException
53      * @throws ClassNotFoundException
54      * @throws InterruptedException
55      */
56     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
57
58         Configuration conf = new Configuration();
59         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
60         if (otherArgs.length != 2) {
61           System.err.println("Usage: wordcount <in> <out>");
62           System.exit(2);
63         }
64         Job job = new Job(conf, "Test1214");
65
66         job.setJarByClass(Test1215.class);
67         job.setMapperClass(MapperClass.class);
68         job.setCombinerClass(ReducerClass.class);
69         job.setReducerClass(ReducerClass.class);
70         job.setOutputKeyClass(Text.class);
71         job.setOutputValueClass(Text.class);
72
75         System.exit(job.waitForCompletion(true) ? 0 : 1);
76         System.out.println("end");
77     }
78
79 }```

数据集：手动创建两个文件，每个文件内都有重复元素，两个文件内也有重复元素，具体如下：

repeat1.txt：

```安徽 jack

repeat2.txt

```江西 lucy

通过运行，我们发现控制台打印的信息如下：

```14/12/15 21:57:07 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/15 21:57:07 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/12/15 21:57:07 INFO input.FileInputFormat: Total input paths to process : 2
14/12/15 21:57:07 INFO mapred.JobClient: Running job: job_local_0001
14/12/15 21:57:07 INFO input.FileInputFormat: Total input paths to process : 2
14/12/15 21:57:07 INFO mapred.MapTask: io.sort.mb = 100
14/12/15 21:57:07 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/15 21:57:07 INFO mapred.MapTask: record buffer = 262144/327680
value:安徽 jack
value:江苏 jim
value:江西 lucy
value:广东 david
value:上海 smith
value:安徽 jack
value:江苏 jim
value:北京 yemener
14/12/15 21:57:08 INFO mapred.MapTask: Starting flush of map output
key:上海 smith
key:北京 yemener
key:安徽 jack
key:广东 david
key:江苏 jim
key:江西 lucy
14/12/15 21:57:08 INFO mapred.MapTask: Finished spill 0
14/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/15 21:57:08 INFO mapred.LocalJobRunner:
14/12/15 21:57:08 INFO mapred.MapTask: io.sort.mb = 100
14/12/15 21:57:08 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/15 21:57:08 INFO mapred.MapTask: record buffer = 262144/327680
value:江西 lucy
value:安徽 jack
value:上海 hanmei
value:北京 yemener
value:新疆 afanti
value:黑龙江 lily
value:福建 tom
value:安徽 jack
14/12/15 21:57:08 INFO mapred.MapTask: Starting flush of map output
key:上海 hanmei
key:北京 yemener
key:安徽 jack
key:新疆 afanti
key:江西 lucy
key:福建 tom
key:黑龙江 lily
14/12/15 21:57:08 INFO mapred.MapTask: Finished spill 0
14/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/15 21:57:08 INFO mapred.LocalJobRunner:
14/12/15 21:57:08 INFO mapred.LocalJobRunner:
14/12/15 21:57:08 INFO mapred.Merger: Merging 2 sorted segments
14/12/15 21:57:08 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 212 bytes
14/12/15 21:57:08 INFO mapred.LocalJobRunner:
key:上海 hanmei
key:上海 smith
key:北京 yemener
key:安徽 jack
key:广东 david
key:新疆 afanti
key:江苏 jim
key:江西 lucy
key:福建 tom
key:黑龙江 lily
14/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/12/15 21:57:08 INFO mapred.LocalJobRunner:
14/12/15 21:57:08 INFO mapred.LocalJobRunner: reduce > reduce
14/12/15 21:57:08 INFO mapred.JobClient: map 100% reduce 100%
14/12/15 21:57:08 INFO mapred.JobClient: Job complete: job_local_0001
14/12/15 21:57:08 INFO mapred.JobClient: Counters: 14
14/12/15 21:57:08 INFO mapred.JobClient: FileSystemCounters
14/12/15 21:57:08 INFO mapred.JobClient: FILE_BYTES_WRITTEN=102997
14/12/15 21:57:08 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=140
14/12/15 21:57:08 INFO mapred.JobClient: Map-Reduce Framework
14/12/15 21:57:08 INFO mapred.JobClient: Reduce input groups=10
14/12/15 21:57:08 INFO mapred.JobClient: Combine output records=13
14/12/15 21:57:08 INFO mapred.JobClient: Map input records=16
14/12/15 21:57:08 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/15 21:57:08 INFO mapred.JobClient: Reduce output records=10
14/12/15 21:57:08 INFO mapred.JobClient: Spilled Records=26
14/12/15 21:57:08 INFO mapred.JobClient: Map output bytes=220
14/12/15 21:57:08 INFO mapred.JobClient: Combine input records=16
14/12/15 21:57:08 INFO mapred.JobClient: Map output records=16
14/12/15 21:57:08 INFO mapred.JobClient: Reduce input records=13```

基于以上两个例子的分析，让我们了解map是怎么一回事，reduce又做了什么，在map和reduce之间还有那些猫腻，整个mapreduce是如何一次次的帮助我们完成我们想要的逻辑，当然这里为了方便用的是小数据集，事实上在大数据集上解决这样的问题更能凸显mapreduce高大上和救世主的形象。

真心觉得Doug Cutting很牛，如何写出这样的框架，低头一想，前面的路还很长。今天就到这，觉得有用，记得点赞哦，你的到来是对我最大的鼓舞^_^

135 篇文章49 人订阅

0 条评论

## 相关文章

### 1624: [Usaco2008 Open] Clear And Present Danger 寻宝之路

1624: [Usaco2008 Open] Clear And Present Danger 寻宝之路 Time Limit: 5 Sec  Memory L...

3346

2947

993

1203

### 1708: [Usaco2007 Oct]Money奶牛的硬币

1708: [Usaco2007 Oct]Money奶牛的硬币 Time Limit: 5 Sec  Memory Limit: 64 MB Submit: 5...

2637

2377

851

2571

2075

35111