Hadoop阅读笔记(二)——利用MapReduce求平均数和去重

前言:圣诞节来了,我怎么能虚度光阴呢?!依稀记得,那一年,大家互赠贺卡,短短几行字,字字融化在心里;那一年,大家在水果市场,寻找那些最能代表自己心意的苹果香蕉梨,摸着冰冷的水果外皮,内心早已滚烫。这一年……我在博客园-_-#,希望用dt的代码燃烧脑细胞,温暖小心窝。

上篇Hadoop阅读笔记(一)——强大的MapReduce主要介绍了MapReduce的在大数据集上处理的优势以及运行机制,通过专利数据编写Demo加深了对于MapReduce中输入输出数据结构的细节理解。有了理论上的指导,仍需手捧我的hadoop圣经——Hadoop实战2,继续走完未走过的路(有种怪怪的感觉,我一直在原地踏步)。

正文:实践是检验真理的唯一标准,不知道这是谁说的,但是作为码农,倒也是实用受用的座右铭。除却大牛们能够在阅读高深理论时new一个并发线程,眼睛所到之处,已然可以理清program的精髓所在,好似一台计算机扫描了所看到的代码一般(有点夸张^_^)。作为普罗大众的搬砖者来说,还是通过一些实例来加深对于理论的认识。

今天主要是通过以下两个例子:求平均成绩、去重来加深对MapReduce的理解。

1.如何用MapReduce求平均成绩——WordCount的加强版

在谈平均成绩之前我们回顾下属性的Hadoop HelloWorld程序——WordCount,其主要是统计数据集中各个单词出现的次数。因为次数没有多少之分,如果将这里的次数换成分数就将字数统计问题转化成了求每个个体的总成绩的问题,再加上一步(总成绩/学科数)运算就是这里要讨论的求平均数的问题了。在笔者看来,MapReduce是一种编程思维,它引导码农们如何将一个亟待解决的问题转换为一个MapReduce过程:map阶段输入什么、map过程执行什么、map阶段输出什么、reduce阶段输入什么、执行什么、输出什么。能够将以上几个点弄清楚整明白,一个MapReduce程序就会跃然纸上。这里:

  Map:      指定格式的数据集(如"张三  60")——输入数据

                 执行每条记录的分割操作以key-value写入上下文context中——执行功能

             得到指定键值对类型的输出(如"(new Text(张三),new IntWritable(60))")——输出结果

  Reduce:   map的输出——输入数据

       求出单个个体的总成绩后再除以该个体课程数目——执行功能

         得到指定键值对类型的输入——输出结果

  鉴于上面的map和reduce过程,我们可以得到如下的代码:

 1 public class Test1214 {
 2 
 3     public static class MapperClass extends Mapper<LongWritable, Text, Text, IntWritable> {
 4         public void map(LongWritable key, Text value, Context context){
 5             String line = value.toString();
 6             System.out.println("该行数据为:" + line);
 7             StringTokenizer token = new StringTokenizer(line,"\t");
 8             String nameT = token.nextToken();
 9             int score = Integer.parseInt(token.nextToken());
10             Text name = new Text(nameT);
11             try {
12                 context.write(name, new IntWritable(score));
13             } catch (IOException e) {
14                 e.printStackTrace();
15             } catch (InterruptedException e) {
16                 e.printStackTrace();
17             }
18         }
19     }
20     
21     public static class ReducerClass extends Reducer<Text, IntWritable, Text, IntWritable>{
22         public void reduce(Text key, Iterable<IntWritable> value, Context context){
23             int sum = 0;
24             int count =0;
25             for(IntWritable score : value){
26                 sum += score.get();
27                 count++;
28                 System.out.println("第" + count + "个数值为:" + score.get());
29             }
30             IntWritable avg = new IntWritable(sum/count);
31             try {
32                 context.write(key, avg);
33             } catch (IOException e) {
34                 e.printStackTrace();
35             } catch (InterruptedException e) {
36                 e.printStackTrace();
37             }
38         }
39     }
40     /**
41      * @param args
42      * @throws IOException 
43      * @throws ClassNotFoundException 
44      * @throws InterruptedException 
45      */
46     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
47         
48         Configuration conf = new Configuration();
49         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
50         if (otherArgs.length != 2) {
51           System.err.println("Usage: wordcount <in> <out>");
52           System.exit(2);
53         }
54         Job job = new Job(conf, "Test1214");
55         
56         job.setJarByClass(Test1214.class);
57         job.setMapperClass(MapperClass.class);
58         job.setCombinerClass(ReducerClass.class);
59         job.setReducerClass(ReducerClass.class);
60         job.setOutputKeyClass(Text.class);
61         job.setOutputValueClass(IntWritable.class);
62         
63         org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
64         org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
65         System.exit(job.waitForCompletion(true) ? 0 : 1);
66         System.out.println("end");
67     }
68 
69 }

数据集:这里的数据是码农我自己手工创建的,主要是想看看mapreduce的运行过程,所以就创建了两个文件,当然这里面的成绩也就没有什么是否符合正态分布的考虑了……

数据中设定有A-K共11个学生,共16门课程,具体数据如下:

  NameScore1.txt:

A	55
B	65
C	44
D	87
E	66
F	90
G	70
H	59
I	61
J	58
K	40
A	45
B	62
C	64
D	77
E	36
F	50
G	80
H	69
I	71
J	70
K	49
A	51
B	64
C	74
D	37
E	76
F	80
G	50
H	51
I	81
J	68
K	80
A	85
B	55
C	49
D	67
E	69
F	50
G	80
H	79
I	81
J	68
K	80
A	35
B	55
C	40
D	47
E	60
F	72
G	76
H	79
I	68
J	78
K	50
A	65
B	45
C	74
D	57
E	56
F	50
G	60
H	59
I	61
J	58
K	60
A	85
B	45
C	74
D	67
E	86
F	70
G	50
H	79
I	81
J	78
K	60
A	50
B	69
C	40
D	89
E	69
F	95
G	75
H	59
I	60
J	59
K	45

NameScore2.txt:

A	65
B	75
C	64
D	67
E	86
F	70
G	90
H	79
I	81
J	78
K	60
A	65
B	82
C	84
D	97
E	66
F	70
G	80
H	89
I	91
J	90
K	69
A	71
B	84
C	94
D	67
E	96
F	80
G	70
H	71
I	81
J	98
K	80
A	85
B	75
C	69
D	87
E	89
F	80
G	70
H	99
I	81
J	88
K	60
A	65
B	75
C	60
D	67
E	80
F	92
G	76
H	79
I	68
J	78
K	70
A	85
B	85
C	74
D	87
E	76
F	60
G	60
H	79
I	81
J	78
K	80
A	85
B	65
C	74
D	67
E	86
F	70
G	70
H	79
I	81
J	78
K	60
A	70
B	69
C	60
D	89
E	69
F	95
G	75
H	59
I	60
J	79
K	65

其执行过程中控制台打印的信息为:

14/12/14 17:05:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/14 17:05:27 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/12/14 17:05:27 INFO input.FileInputFormat: Total input paths to process : 2
14/12/14 17:05:27 INFO mapred.JobClient: Running job: job_local_0001
14/12/14 17:05:27 INFO input.FileInputFormat: Total input paths to process : 2
14/12/14 17:05:27 INFO mapred.MapTask: io.sort.mb = 100
14/12/14 17:05:28 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/14 17:05:28 INFO mapred.MapTask: record buffer = 262144/327680
该行数据为:A 55
该行数据为:B 65
该行数据为:C 44
该行数据为:D 87
该行数据为:E 66
该行数据为:F 90
该行数据为:G 70
该行数据为:H 59
该行数据为:I 61
该行数据为:J 58
该行数据为:K 40
该行数据为:A 45
该行数据为:B 62
该行数据为:C 64
该行数据为:D 77
该行数据为:E 36
该行数据为:F 50
该行数据为:G 80
该行数据为:H 69
该行数据为:I 71
该行数据为:J 70
该行数据为:K 49
该行数据为:A 51
该行数据为:B 64
该行数据为:C 74
该行数据为:D 37
该行数据为:E 76
该行数据为:F 80
该行数据为:G 50
该行数据为:H 51
该行数据为:I 81
该行数据为:J 68
该行数据为:K 80
该行数据为:A 85
该行数据为:B 55
该行数据为:C 49
该行数据为:D 67
该行数据为:E 69
该行数据为:F 50
该行数据为:G 80
该行数据为:H 79
该行数据为:I 81
该行数据为:J 68
该行数据为:K 80
该行数据为:A 35
该行数据为:B 55
该行数据为:C 40
该行数据为:D 47
该行数据为:E 60
该行数据为:F 72
该行数据为:G 76
该行数据为:H 79
该行数据为:I 68
该行数据为:J 78
该行数据为:K 50
该行数据为:A 65
该行数据为:B 45
该行数据为:C 74
该行数据为:D 57
该行数据为:E 56
该行数据为:F 50
该行数据为:G 60
该行数据为:H 59
该行数据为:I 61
该行数据为:J 58
该行数据为:K 60
该行数据为:A 85
该行数据为:B 45
该行数据为:C 74
该行数据为:D 67
该行数据为:E 86
该行数据为:F 70
该行数据为:G 50
该行数据为:H 79
该行数据为:I 81
该行数据为:J 78
该行数据为:K 60
该行数据为:A 50
该行数据为:B 69
该行数据为:C 40
该行数据为:D 89
该行数据为:E 69
该行数据为:F 95
该行数据为:G 75
该行数据为:H 59
该行数据为:I 60
该行数据为:J 59
该行数据为:K 45
14/12/14 17:05:28 INFO mapred.MapTask: Starting flush of map output
第1个数值为:55
第2个数值为:45
第3个数值为:51
第4个数值为:85
第5个数值为:35
第6个数值为:65
第7个数值为:85
第8个数值为:50
第1个数值为:45
第2个数值为:64
第3个数值为:65
第4个数值为:45
第5个数值为:55
第6个数值为:69
第7个数值为:62
第8个数值为:55
第1个数值为:64
第2个数值为:49
第3个数值为:44
第4个数值为:74
第5个数值为:74
第6个数值为:40
第7个数值为:40
第8个数值为:74
第1个数值为:67
第2个数值为:67
第3个数值为:77
第4个数值为:37
第5个数值为:87
第6个数值为:57
第7个数值为:89
第8个数值为:47
第1个数值为:36
第2个数值为:66
第3个数值为:76
第4个数值为:86
第5个数值为:69
第6个数值为:69
第7个数值为:60
第8个数值为:56
第1个数值为:90
第2个数值为:95
第3个数值为:70
第4个数值为:50
第5个数值为:80
第6个数值为:50
第7个数值为:50
第8个数值为:72
第1个数值为:60
第2个数值为:76
第3个数值为:50
第4个数值为:50
第5个数值为:80
第6个数值为:70
第7个数值为:75
第8个数值为:80
第1个数值为:59
第2个数值为:69
第3个数值为:51
第4个数值为:79
第5个数值为:59
第6个数值为:79
第7个数值为:59
第8个数值为:79
第1个数值为:60
第2个数值为:61
第3个数值为:81
第4个数值为:81
第5个数值为:61
第6个数值为:71
第7个数值为:68
第8个数值为:81
第1个数值为:58
第2个数值为:59
第3个数值为:78
第4个数值为:68
第5个数值为:78
第6个数值为:68
第7个数值为:70
第8个数值为:58
第1个数值为:40
第2个数值为:50
第3个数值为:49
第4个数值为:60
第5个数值为:60
第6个数值为:45
第7个数值为:80
第8个数值为:80
14/12/14 17:05:28 INFO mapred.MapTask: Finished spill 0
14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner: 
14/12/14 17:05:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
14/12/14 17:05:28 INFO mapred.MapTask: io.sort.mb = 100
14/12/14 17:05:28 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/14 17:05:28 INFO mapred.MapTask: record buffer = 262144/327680
该行数据为:A 65
该行数据为:B 75
该行数据为:C 64
该行数据为:D 67
该行数据为:E 86
该行数据为:F 70
该行数据为:G 90
该行数据为:H 79
该行数据为:I 81
该行数据为:J 78
该行数据为:K 60
该行数据为:A 65
该行数据为:B 82
该行数据为:C 84
该行数据为:D 97
该行数据为:E 66
该行数据为:F 70
该行数据为:G 80
该行数据为:H 89
该行数据为:I 91
该行数据为:J 90
该行数据为:K 69
该行数据为:A 71
该行数据为:B 84
该行数据为:C 94
该行数据为:D 67
该行数据为:E 96
该行数据为:F 80
该行数据为:G 70
该行数据为:H 71
该行数据为:I 81
该行数据为:J 98
该行数据为:K 80
该行数据为:A 85
该行数据为:B 75
该行数据为:C 69
该行数据为:D 87
该行数据为:E 89
该行数据为:F 80
该行数据为:G 70
该行数据为:H 99
该行数据为:I 81
该行数据为:J 88
该行数据为:K 60
该行数据为:A 65
该行数据为:B 75
该行数据为:C 60
该行数据为:D 67
该行数据为:E 80
该行数据为:F 92
该行数据为:G 76
该行数据为:H 79
该行数据为:I 68
该行数据为:J 78
该行数据为:K 70
该行数据为:A 85
该行数据为:B 85
该行数据为:C 74
该行数据为:D 87
该行数据为:E 76
该行数据为:F 60
该行数据为:G 60
该行数据为:H 79
该行数据为:I 81
该行数据为:J 78
该行数据为:K 80
该行数据为:A 85
该行数据为:B 65
该行数据为:C 74
该行数据为:D 67
该行数据为:E 86
该行数据为:F 70
该行数据为:G 70
该行数据为:H 79
该行数据为:I 81
该行数据为:J 78
该行数据为:K 60
该行数据为:A 70
该行数据为:B 69
该行数据为:C 60
该行数据为:D 89
该行数据为:E 69
该行数据为:F 95
该行数据为:G 75
该行数据为:H 59
该行数据为:I 60
该行数据为:J 79
该行数据为:K 65
14/12/14 17:05:28 INFO mapred.MapTask: Starting flush of map output
第1个数值为:65
第2个数值为:65
第3个数值为:71
第4个数值为:85
第5个数值为:65
第6个数值为:85
第7个数值为:85
第8个数值为:70
第1个数值为:65
第2个数值为:84
第3个数值为:75
第4个数值为:85
第5个数值为:75
第6个数值为:69
第7个数值为:82
第8个数值为:75
第1个数值为:84
第2个数值为:69
第3个数值为:64
第4个数值为:74
第5个数值为:94
第6个数值为:60
第7个数值为:60
第8个数值为:74
第1个数值为:67
第2个数值为:87
第3个数值为:97
第4个数值为:67
第5个数值为:67
第6个数值为:87
第7个数值为:89
第8个数值为:67
第1个数值为:66
第2个数值为:86
第3个数值为:96
第4个数值为:86
第5个数值为:89
第6个数值为:69
第7个数值为:80
第8个数值为:76
第1个数值为:70
第2个数值为:95
第3个数值为:70
第4个数值为:70
第5个数值为:80
第6个数值为:60
第7个数值为:80
第8个数值为:92
第1个数值为:60
第2个数值为:76
第3个数值为:70
第4个数值为:70
第5个数值为:80
第6个数值为:90
第7个数值为:75
第8个数值为:70
第1个数值为:79
第2个数值为:89
第3个数值为:71
第4个数值为:99
第5个数值为:59
第6个数值为:79
第7个数值为:79
第8个数值为:79
第1个数值为:60
第2个数值为:81
第3个数值为:81
第4个数值为:81
第5个数值为:81
第6个数值为:91
第7个数值为:68
第8个数值为:81
第1个数值为:78
第2个数值为:79
第3个数值为:78
第4个数值为:88
第5个数值为:78
第6个数值为:98
第7个数值为:90
第8个数值为:78
第1个数值为:60
第2个数值为:70
第3个数值为:69
第4个数值为:60
第5个数值为:80
第6个数值为:65
第7个数值为:60
第8个数值为:80
14/12/14 17:05:28 INFO mapred.MapTask: Finished spill 0
14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner: 
14/12/14 17:05:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done.
14/12/14 17:05:28 INFO mapred.LocalJobRunner: 
14/12/14 17:05:28 INFO mapred.Merger: Merging 2 sorted segments
14/12/14 17:05:28 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 180 bytes
14/12/14 17:05:28 INFO mapred.LocalJobRunner: 
第1个数值为:58
第2个数值为:73
第1个数值为:76
第2个数值为:57
第1个数值为:57
第2个数值为:72
第1个数值为:78
第2个数值为:66
第1个数值为:64
第2个数值为:81
第1个数值为:77
第2个数值为:69
第1个数值为:67
第2个数值为:73
第1个数值为:79
第2个数值为:66
第1个数值为:70
第2个数值为:78
第1个数值为:83
第2个数值为:67
第1个数值为:58
第2个数值为:68
14/12/14 17:05:28 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/12/14 17:05:28 INFO mapred.LocalJobRunner: 
14/12/14 17:05:28 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/12/14 17:05:28 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://hadoop:9000/user/hadoop/output4
14/12/14 17:05:28 INFO mapred.LocalJobRunner: reduce > reduce
14/12/14 17:05:28 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
14/12/14 17:05:28 INFO mapred.JobClient: map 100% reduce 100%
14/12/14 17:05:28 INFO mapred.JobClient: Job complete: job_local_0001
14/12/14 17:05:28 INFO mapred.JobClient: Counters: 14
14/12/14 17:05:28 INFO mapred.JobClient: FileSystemCounters
14/12/14 17:05:28 INFO mapred.JobClient: FILE_BYTES_READ=50573
14/12/14 17:05:28 INFO mapred.JobClient: HDFS_BYTES_READ=2630
14/12/14 17:05:28 INFO mapred.JobClient: FILE_BYTES_WRITTEN=103046
14/12/14 17:05:28 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=55
14/12/14 17:05:28 INFO mapred.JobClient: Map-Reduce Framework
14/12/14 17:05:28 INFO mapred.JobClient: Reduce input groups=11
14/12/14 17:05:28 INFO mapred.JobClient: Combine output records=22
14/12/14 17:05:28 INFO mapred.JobClient: Map input records=176
14/12/14 17:05:28 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/14 17:05:28 INFO mapred.JobClient: Reduce output records=11
14/12/14 17:05:28 INFO mapred.JobClient: Spilled Records=44
14/12/14 17:05:28 INFO mapred.JobClient: Map output bytes=1056
14/12/14 17:05:28 INFO mapred.JobClient: Combine input records=176
14/12/14 17:05:28 INFO mapred.JobClient: Map output records=176
14/12/14 17:05:28 INFO mapred.JobClient: Reduce input records=22

最终结果为:
A 65
B 66
C 64
D 72
E 72
F 73
G 70
H 72
I 74
J 75
K 63

  为了更清晰从将要执行的控制台中看到map和reduce过程的执行都进行了那些操作,我们在其中打印了相关信息,这里有自己的两点疑惑需要拿出来闹闹:

  (1).这里我写的程序和书中不一样,没有增加StringTokenizer token = new StringTokenizer(line,"line")这行,事实上我加上去后会出现错误,我的理解是,因为默认格式是TextInputFormat即已经将文件中的文本按照行标示进行分割,即输入给map方法的已经是以一行为单位的记录,所以这里不需要以“\n”进行分割了。书中的做法应该是假定将整个文件拿过来,统一处理,但事实上这里默认的TextInputFormat已经完成了前期工作。(如果执迷不悟这样处理的话,距离来说NameScore1.txt中第一行是“A     55”整个以“\n”分割后就是一个整体了,再以“\t”就无法分割了。)

  (2).从执行过程打印的信息,起初让我有些疑惑,因为从信息来看,似乎是:NameScore1.txt被分割并以每行记录进入map过程,当执行到该文件的最后一行记录时,从打印信息我们似乎看到的是紧接着就去执行reduce过程了,后面的NameScore2.txt也是如此,当两个文件都分别执行了map和reduce时似乎又执行了一次reduce操作。那么事实是不是如此,如果真是这样,这与先前所看到的理论中提到当map执行完后再执行reduce是否有冲突。

  通过查看代码我们发现

  job.setMapperClass(MapperClass.class);   job.setCombinerClass(ReducerClass.class);   job.setReducerClass(ReducerClass.class);

  是的,没错,在这里我们发现了端倪,其真正执行过程是:先执行map,这就是过程信息中打印了每条成绩记录,后面执行的reduce其实是介于map和reduce之间的combiner操作,那么这个Combiner类又为何物,通过神奇的API我们可以发现Combine其实就是一次reduce过程,而且这里明确写了combiner和reduce过程都是使用ReducerClass类,从而就出现了这里为什么对于单个文件都会先执行map然后在reduce,再后来是两个map执行后,才执行的真正的reduce。

  2.去重——阉割版的WordCount

相比于前面的求平均值例子需要添加一些逻辑代码来说,这里的去重更像是阉割版的WordCount。

  如果你还是用传统的思维在考量一个去重的程序需要多少次的判断,如果你还不了解什么是真正的map和reduce。Hadoop中的去重问题被你整复杂了。要知道,当一个map执行完后会对执行的数据进行一个排序,比如按照字母先后顺序;后面会进入combine阶段,这阶段主要是针对key-value中有相同的key就合并;再到reduce阶段,通过迭代器遍历前一阶段合并的各个元素,得到最终的输出结果。

  对于去重来说,我们不在乎一个元素到底出现了几次,只要知道这个元素确实出现了,并能够再最后显示出来就行了,通过map和combiner,我们最终得到的key-value对中的key都是不一样的,也就是说在完成合并的同时就是我们所需要的去重操作(是不是有点绕)。最终reduce输出的就是具有唯一性的去重的元素集合。我们还是按照理清map和reduce的思路来看待这个去重问题:

  map:  数据中的一行记录如"(安徽  jack)"——输入数据

       直接以key-value的方式写入上下文对象context(这里的value并不是我们关心的对象,可以为空)——执行功能

                  得到指定键值对类型的输出如"(new Text(安徽),new Text(""))"——输出结果

  reduce: map的输出——输入数据

                   直接以key-value的方式写入上下文对象context(同样,value并不是我们关心的对象)——执行功能

           得到指定键值对类型的输入——输出结果 

  鉴于以上对于map和reduce的理解,代码如下:

 1 package org.apache.mapreduce;
 2 
 3 import java.io.IOException;
 4 import java.util.Collection;
 5 import java.util.Iterator;
 6 import java.util.StringTokenizer;
 7 
 8 import org.apache.hadoop.conf.Configuration;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.IntWritable;
11 import org.apache.hadoop.io.LongWritable;
12 import org.apache.hadoop.io.Text;
13 import org.apache.hadoop.mapred.TextInputFormat;
14 import org.apache.hadoop.mapreduce.Job;
15 import org.apache.hadoop.mapreduce.Mapper;
16 import org.apache.hadoop.mapreduce.Reducer;
17 import org.apache.hadoop.util.GenericOptionsParser;
18 import org.apache.mapreduce.Test1123.MapperClass;
19 import org.apache.mapreduce.Test1123.ReducerClass;
20 
21 public class Test1215 {
22 
23     public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {
24         public void map(LongWritable key, Text value, Context context){
25             
26             try {
27                 context.write(value, new Text(""));
28                 System.out.println("value:" + value);
29             } catch (IOException e) {
30                 e.printStackTrace();
31             } catch (InterruptedException e) {
32                 e.printStackTrace();
33             }
34         }
35     }
36     
37     public static class ReducerClass extends Reducer<Text, Text, Text, Text>{
38         public void reduce(Text key, Iterable<Text> value, Context context){
39             
40             try {
41                 context.write(key, new Text(""));
42                 System.out.println("key:"+key);
43             } catch (IOException e) {
44                 e.printStackTrace();
45             } catch (InterruptedException e) {
46                 e.printStackTrace();
47             }
48         }
49     }
50     /**
51      * @param args
52      * @throws IOException 
53      * @throws ClassNotFoundException 
54      * @throws InterruptedException 
55      */
56     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
57         
58         Configuration conf = new Configuration();
59         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
60         if (otherArgs.length != 2) {
61           System.err.println("Usage: wordcount <in> <out>");
62           System.exit(2);
63         }
64         Job job = new Job(conf, "Test1214");
65         
66         job.setJarByClass(Test1215.class);
67         job.setMapperClass(MapperClass.class);
68         job.setCombinerClass(ReducerClass.class);
69         job.setReducerClass(ReducerClass.class);
70         job.setOutputKeyClass(Text.class);
71         job.setOutputValueClass(Text.class);
72         
73         org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
74         org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
75         System.exit(job.waitForCompletion(true) ? 0 : 1);
76         System.out.println("end");
77     }
78 
79 }

  数据集:手动创建两个文件,每个文件内都有重复元素,两个文件内也有重复元素,具体如下:

  repeat1.txt:

安徽 jack
江苏 jim
江西 lucy
广东 david
上海 smith
安徽 jack
江苏 jim
北京 yemener

  repeat2.txt

江西 lucy
安徽 jack
上海 hanmei
北京 yemener
新疆 afanti
黑龙江 lily
福建 tom
安徽 jack

  通过运行,我们发现控制台打印的信息如下:

14/12/15 21:57:07 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/12/15 21:57:07 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/12/15 21:57:07 INFO input.FileInputFormat: Total input paths to process : 2
14/12/15 21:57:07 INFO mapred.JobClient: Running job: job_local_0001
14/12/15 21:57:07 INFO input.FileInputFormat: Total input paths to process : 2
14/12/15 21:57:07 INFO mapred.MapTask: io.sort.mb = 100
14/12/15 21:57:07 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/15 21:57:07 INFO mapred.MapTask: record buffer = 262144/327680
value:安徽 jack
value:江苏 jim
value:江西 lucy
value:广东 david
value:上海 smith
value:安徽 jack
value:江苏 jim
value:北京 yemener
14/12/15 21:57:08 INFO mapred.MapTask: Starting flush of map output
key:上海 smith
key:北京 yemener
key:安徽 jack
key:广东 david
key:江苏 jim
key:江西 lucy
14/12/15 21:57:08 INFO mapred.MapTask: Finished spill 0
14/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/12/15 21:57:08 INFO mapred.LocalJobRunner: 
14/12/15 21:57:08 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
14/12/15 21:57:08 INFO mapred.MapTask: io.sort.mb = 100
14/12/15 21:57:08 INFO mapred.MapTask: data buffer = 79691776/99614720
14/12/15 21:57:08 INFO mapred.MapTask: record buffer = 262144/327680
value:江西 lucy
value:安徽 jack
value:上海 hanmei
value:北京 yemener
value:新疆 afanti
value:黑龙江 lily
value:福建 tom
value:安徽 jack
14/12/15 21:57:08 INFO mapred.MapTask: Starting flush of map output
key:上海 hanmei
key:北京 yemener
key:安徽 jack
key:新疆 afanti
key:江西 lucy
key:福建 tom
key:黑龙江 lily
14/12/15 21:57:08 INFO mapred.MapTask: Finished spill 0
14/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/12/15 21:57:08 INFO mapred.LocalJobRunner: 
14/12/15 21:57:08 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done.
14/12/15 21:57:08 INFO mapred.LocalJobRunner: 
14/12/15 21:57:08 INFO mapred.Merger: Merging 2 sorted segments
14/12/15 21:57:08 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 212 bytes
14/12/15 21:57:08 INFO mapred.LocalJobRunner: 
key:上海 hanmei
key:上海 smith
key:北京 yemener
key:安徽 jack
key:广东 david
key:新疆 afanti
key:江苏 jim
key:江西 lucy
key:福建 tom
key:黑龙江 lily
14/12/15 21:57:08 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/12/15 21:57:08 INFO mapred.LocalJobRunner: 
14/12/15 21:57:08 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/12/15 21:57:08 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://hadoop:9000/user/hadoop/output5
14/12/15 21:57:08 INFO mapred.LocalJobRunner: reduce > reduce
14/12/15 21:57:08 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
14/12/15 21:57:08 INFO mapred.JobClient: map 100% reduce 100%
14/12/15 21:57:08 INFO mapred.JobClient: Job complete: job_local_0001
14/12/15 21:57:08 INFO mapred.JobClient: Counters: 14
14/12/15 21:57:08 INFO mapred.JobClient: FileSystemCounters
14/12/15 21:57:08 INFO mapred.JobClient: FILE_BYTES_READ=50584
14/12/15 21:57:08 INFO mapred.JobClient: HDFS_BYTES_READ=507
14/12/15 21:57:08 INFO mapred.JobClient: FILE_BYTES_WRITTEN=102997
14/12/15 21:57:08 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=140
14/12/15 21:57:08 INFO mapred.JobClient: Map-Reduce Framework
14/12/15 21:57:08 INFO mapred.JobClient: Reduce input groups=10
14/12/15 21:57:08 INFO mapred.JobClient: Combine output records=13
14/12/15 21:57:08 INFO mapred.JobClient: Map input records=16
14/12/15 21:57:08 INFO mapred.JobClient: Reduce shuffle bytes=0
14/12/15 21:57:08 INFO mapred.JobClient: Reduce output records=10
14/12/15 21:57:08 INFO mapred.JobClient: Spilled Records=26
14/12/15 21:57:08 INFO mapred.JobClient: Map output bytes=220
14/12/15 21:57:08 INFO mapred.JobClient: Combine input records=16
14/12/15 21:57:08 INFO mapred.JobClient: Map output records=16
14/12/15 21:57:08 INFO mapred.JobClient: Reduce input records=13

  基于以上两个例子的分析,让我们了解map是怎么一回事,reduce又做了什么,在map和reduce之间还有那些猫腻,整个mapreduce是如何一次次的帮助我们完成我们想要的逻辑,当然这里为了方便用的是小数据集,事实上在大数据集上解决这样的问题更能凸显mapreduce高大上和救世主的形象。

  真心觉得Doug Cutting很牛,如何写出这样的框架,低头一想,前面的路还很长。今天就到这,觉得有用,记得点赞哦,你的到来是对我最大的鼓舞^_^

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏HansBug's Lab

1624: [Usaco2008 Open] Clear And Present Danger 寻宝之路

1624: [Usaco2008 Open] Clear And Present Danger 寻宝之路 Time Limit: 5 Sec  Memory L...

3346
来自专栏ml

HDUOJ-----4506小明系列故事——师兄帮帮忙

小明系列故事——师兄帮帮忙 Time Limit: 3000/1000 MS (Java/Others)    Memory Limit: 65535/3276...

2947
来自专栏数据结构与算法

BZOJ2152: 聪聪可可(点分治)

聪聪和可可是兄弟俩,他们俩经常为了一些琐事打起来,例如家中只剩下最后一根冰棍而两人都想吃、两个人都想玩儿电脑(可是他们家只有一台电脑)……遇到这种问题,一般情况...

993
来自专栏章鱼的慢慢技术路

今日头条2018校园春季招聘研发岗位笔试(第一场)经验

1203
来自专栏HansBug's Lab

1708: [Usaco2007 Oct]Money奶牛的硬币

1708: [Usaco2007 Oct]Money奶牛的硬币 Time Limit: 5 Sec  Memory Limit: 64 MB Submit: 5...

2637
来自专栏Fish

poj-3253 优先队列

大神指导说要弄个博客,写写题解。那我就听大神的话,写写题解吧。 这道题的题意就是FJ这个人有一块木板,长度是N(1 ≤ N ≤ 20,000),然后要切成好多块...

2377
来自专栏跟着阿笨一起玩NET

算法~将文件夹下所有文件输出到日志文件中(包括所有子文件夹下的)

算法文章,总是带给我们无穷的思考和兴趣,一个问题,多种解决方法,看你如何去思考它,对于标题所引出的问题,我觉得,使用递归是比较有效的方法,当然递归还有很多使用场...

851
来自专栏跟着阿笨一起玩NET

SQL字符串的分组聚合(ZT)

    今天在看订阅的RSS的时候,看到这么一个问题:T-Sql中如何对分组的信息进行聚合,并以逗号连接字符;也就是对一个表中的某个字段进行分组,然后对另一个字...

2571
来自专栏我和未来有约会

[Silverlight动画]转向行为 - 对象回避

对象回避主题的完整意义是指,在机车行走的路线中存在一些障碍物,机车必须绕开、防止触碰到它们。听上去和碰撞检测有关,然而这仅仅是发生在预测阶段,也就是:“以我当前...

2075
来自专栏C语言及其他语言

【每日一题】问题 1146: 舍罕王的失算

关注我们 题目描述 相传国际象棋是古印度舍罕王的宰相达依尔发明的.舍罕王十分喜爱象棋,决定让宰相自己选择何种赏赐.这位聪明的宰相指着8*8共64格的象棋说:陛...

35111

扫码关注云+社区

领取腾讯云代金券