我们可能会有些需求要求MapReduce的输出全局有序,这里说的有序是指Key全局有序。但是我们知道,MapReduce默认只是保证同一个分区内的Key是有序的,但是不保证全局有序。基于此,本文提供三种方法来对MapReduce的输出进行全局排序。
1 生成测试数据2 使用一个Reduce进行排序3 自定义分区函数实现全局有序
在介绍如何实现之前,我们先来生成一些测试数据,实现如下:
1#!/bin/sh
2
3for i in {1..100000};do
4 echo $RANDOM
5done;
将上面的代码保存到 iteblog.sh
的文件里面,然后运行
1$ sh iteblog.sh > data1
2$ sh iteblog.sh > data2
3$ hadoop fs -put data1 /user/iteblog/input
4$ hadoop fs -put data2 /user/iteblog/input
$RANDOM
变量是Shell内置的,使用它能够生成五位内的随机正整数。上面我们一共运行了两次,这样我们就有两份随机数文件data1和data2;最后我们把生成的随机数文件上传到HDFS上。现在我们可以来写程序对这两个文件里面的数据进行排序了。
前面我们说了,MapReduce默认只是保证同一个分区内的Key是有序的,但是不保证全局有序。如果我们将所有的数据全部发送到一个Reduce,那么不就可以实现结果全局有序吗?这种方法实现很简单,如下:
1package com.iteblog.mapreduce.sort;
2
3import org.apache.hadoop.conf.Configured;
4import org.apache.hadoop.fs.Path;
5import org.apache.hadoop.io.IntWritable;
6import org.apache.hadoop.io.LongWritable;
7import org.apache.hadoop.io.NullWritable;
8import org.apache.hadoop.io.Text;
9import org.apache.hadoop.mapreduce.Job;
10import org.apache.hadoop.mapreduce.Mapper;
11import org.apache.hadoop.mapreduce.Reducer;
12import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14import org.apache.hadoop.util.Tool;
15import org.apache.hadoop.util.ToolRunner;
16
17import java.io.IOException;
18
19public class TotalSortV1 extends Configured implements Tool {
20 static class SimpleMapper extends
21 Mapper<LongWritable, Text, IntWritable, IntWritable> {
22 @Override
23 protected void map(LongWritable key, Text value,
24 Context context) throws IOException, InterruptedException {
25 IntWritable intWritable = new IntWritable(Integer.parseInt(value.toString()));
26 context.write(intWritable, intWritable);
27 }
28 }
29
30 static class SimpleReducer extends
31 Reducer<IntWritable, IntWritable, IntWritable, NullWritable> {
32 @Override
33 protected void reduce(IntWritable key, Iterable<IntWritable> values,
34 Context context) throws IOException, InterruptedException {
35 for (IntWritable value : values)
36 context.write(value, NullWritable.get());
37 }
38 }
39
40 @Override
41 public int run(String[] args) throws Exception {
42 if (args.length != 2) {
43 System.err.println("<input> <output>");
44 System.exit(127);
45 }
46
47 Job job = Job.getInstance(getConf());
48 job.setJarByClass(TotalSortV1.class);
49 FileInputFormat.addInputPath(job, new Path(args[0]));
50 FileOutputFormat.setOutputPath(job, new Path(args[1]));
51
52 job.setMapperClass(SimpleMapper.class);
53 job.setReducerClass(SimpleReducer.class);
54 job.setMapOutputKeyClass(IntWritable.class);
55 job.setMapOutputValueClass(IntWritable.class);
56 job.setOutputKeyClass(IntWritable.class);
57 job.setOutputValueClass(NullWritable.class);
58 job.setNumReduceTasks(1);
59 job.setJobName("TotalSort");
60 return job.waitForCompletion(true) ? 0 : 1;
61 }
62
63 public static void main(String[] args) throws Exception {
64 int exitCode = ToolRunner.run(new TotalSort(), args);
65 System.exit(exitCode);
66 }
67}
上面程序的实现很简单,我们直接使用TextInputFormat
类来读取上面生成的随机数文件(data1
和data2
)。因为文件里面的数据是正整数,所以我们在 SimpleMapper
类里面直接将value转换成int类型,然后赋值给IntWritable
。等数据到 SimpleReducer
的时候,同一个Reduce
里面的Key
已经全部有序;因为我们设置了一个Reduce作业
,这样的话,我们就实现了数据全局有序。运行如下:
1[iteblog@www.iteblog.com /home/iteblog]$ hadoop jar total-sort-0.1.jar com.iteblog.mapreduce.sort.TotalSortV1 /user/iteblog/input /user/iteblog/output
2
3[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -ls /user/iteblog/output
4Found 2 items
5-rw-r--r-- 3 iteblog supergroup 0 2017-05-09 11:41 /user/iteblog/output/_SUCCESS
6-rw-r--r-- 3 iteblog supergroup 1131757 2017-05-09 11:41 /user/iteblog/output/part-r-00000
7
8[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/part-r-00000 | head -n 10
90
100
110
120
131
141
151
161
171
181
19
20[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/part-r-00000 | tail -n 10
2132766
2232766
2332766
2432766
2532767
2632767
2732767
2832767
2932767
3032767
从上面的测试结果也可以看出,我们只生成了一个数据文件,而且这个文件里面的数据已经全局有序了。
上面实现数据全局有序有个很大的局限性:所有的数据都发送到一个Reduce
进行排序,这样不能充分利用集群的计算资源,而且在数据量很大的情况下,很有可能会出现OOM问题。我们分析一下,MapReduce
默认的分区函数是HashPartitioner
,其实现的原理是计算map输出key的hashCode
,然后对Reduce个数求模,这样只要求模结果一样的Key都会发送到同一个Reduce
。如果我们能够实现一个分区函数,使得
这就实现了Reduce 0的数据一定全部小于Reduce 1,且Reduce 1的数据全部小于Reduce 2,再加上同一个Reduce里面的数据局部有序,这样就实现了数据的全局有序。实现如下:
1package com.iteblog.mapreduce.sort;
2
3import com.iteblog.mapreduce.secondSort.IntPair;
4import org.apache.hadoop.conf.Configured;
5import org.apache.hadoop.fs.Path;
6import org.apache.hadoop.io.IntWritable;
7import org.apache.hadoop.io.LongWritable;
8import org.apache.hadoop.io.NullWritable;
9import org.apache.hadoop.io.Text;
10import org.apache.hadoop.mapreduce.Job;
11import org.apache.hadoop.mapreduce.Mapper;
12import org.apache.hadoop.mapreduce.Partitioner;
13import org.apache.hadoop.mapreduce.Reducer;
14import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
15import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16import org.apache.hadoop.util.Tool;
17import org.apache.hadoop.util.ToolRunner;
18
19import java.io.IOException;
20
21public class TotalSortV2 extends Configured implements Tool {
22 static class SimpleMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
23 @Override
24 protected void map(LongWritable key, Text value,
25 Context context) throws IOException, InterruptedException {
26 IntWritable intWritable = new IntWritable(Integer.parseInt(value.toString()));
27 context.write(intWritable, intWritable);
28 }
29 }
30
31 static class SimpleReducer extends Reducer<IntWritable, IntWritable, IntWritable, NullWritable> {
32 @Override
33 protected void reduce(IntWritable key, Iterable<IntWritable> values,
34 Context context) throws IOException, InterruptedException {
35 for (IntWritable value : values)
36 context.write(value, NullWritable.get());
37 }
38 }
39
40 public static class IteblogPartitioner extends Partitioner<IntWritable, IntWritable> {
41 @Override
42 public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
43 int keyInt = Integer.parseInt(key.toString());
44 if (keyInt < 10000) {
45 return 0;
46 } else if (keyInt < 20000) {
47 return 1;
48 } else {
49 return 2;
50 }
51 }
52 }
53
54 @Override
55 public int run(String[] args) throws Exception {
56 if (args.length != 2) {
57 System.err.println("<input> <output>");
58 System.exit(127);
59 }
60
61 Job job = Job.getInstance(getConf());
62 job.setJarByClass(TotalSortV2.class);
63 FileInputFormat.addInputPath(job, new Path(args[0]));
64 FileOutputFormat.setOutputPath(job, new Path(args[1]));
65
66 job.setMapperClass(SimpleMapper.class);
67 job.setReducerClass(SimpleReducer.class);
68 job.setPartitionerClass(IteblogPartitioner.class);
69 job.setMapOutputKeyClass(IntWritable.class);
70 job.setMapOutputValueClass(IntWritable.class);
71 job.setOutputKeyClass(IntWritable.class);
72 job.setOutputValueClass(NullWritable.class);
73 job.setNumReduceTasks(3);
74 job.setJobName("dw_subject");
75 return job.waitForCompletion(true) ? 0 : 1;
76 }
77
78 public static void main(String[] args) throws Exception {
79 int exitCode = ToolRunner.run(new TotalSortV2(), args);
80 System.exit(exitCode);
81 }
82}
83第二版的排序实现除了自定义的 IteblogPartitioner,其余的和第一种实现一样。现在我们来运行一下:
84
85[iteblog@www.iteblog.com /home/iteblog]$ hadoop jar total-sort-0.1.jar com.iteblog.mapreduce.sort.TotalSortV2 /user/iteblog/input /user/iteblog/output1
86
87[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -ls /user/iteblog/output1
88Found 4 items
89-rw-r--r-- 3 iteblog supergroup 0 2017-05-09 13:53 /user/iteblog/output1/_SUCCESS
90-rw-r--r-- 3 iteblog supergroup 299845 2017-05-09 13:53 /user/iteblog/output1/part-r-00000
91-rw-r--r-- 3 iteblog supergroup 365190 2017-05-09 13:53 /user/iteblog/output1/part-r-00001
92-rw-r--r-- 3 iteblog supergroup 466722 2017-05-09 13:53 /user/iteblog/output1/part-r-00002
93
94[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00000 | head -n 10
950
960
970
980
991
1001
1011
1021
1031
1041
105
106[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00000 | tail -n 10
1079998
1089998
1099998
1109999
1119999
1129999
1139999
1149999
1159999
1169999
117
118[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00001 | head -n 10
11910000
12010000
12110000
12210000
12310000
12410000
12510001
12610001
12710001
12810001
129
130[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00001 | tail -n 10
13119997
13219997
13319998
13419998
13519998
13619998
13719999
13819999
13919999
14019999
141
142[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00002 | head -n 10
14320000
14420000
14520000
14620000
14720000
14820000
14920001
15020001
15120001
15220001
153
154[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00002 | tail -n 10
15532766
15632766
15732766
15832766
15932767
16032767
16132767
16232767
16332767
16432767
我们已经看到了这个程序生成了三个文件(因为我们设置了Reduce个数为3),而且每个文件都是局部有序;所有小于10000的数据都在part-r-00000里面,所有小于20000的数据都在part-r-00001里面,所有大于20000的数据都在part-r-00002里面。part-r-00000、part-r-00001和part-r-00002三个文件实现了全局有序。