专栏首页CoXie带你学编程三种方法实现Hadoop(MapReduce)全局排序(1)

三种方法实现Hadoop(MapReduce)全局排序(1)

我们可能会有些需求要求MapReduce的输出全局有序,这里说的有序是指Key全局有序。但是我们知道,MapReduce默认只是保证同一个分区内的Key是有序的,但是不保证全局有序。基于此,本文提供三种方法来对MapReduce的输出进行全局排序。

目录

1 生成测试数据2 使用一个Reduce进行排序3 自定义分区函数实现全局有序

1 生成测试数据

在介绍如何实现之前,我们先来生成一些测试数据,实现如下:

1#!/bin/sh
2
3for i in {1..100000};do
4        echo $RANDOM
5done;

将上面的代码保存到 iteblog.sh 的文件里面,然后运行

1$ sh iteblog.sh > data1
2$ sh iteblog.sh > data2
3$ hadoop fs -put data1 /user/iteblog/input
4$ hadoop fs -put data2 /user/iteblog/input

$RANDOM 变量是Shell内置的,使用它能够生成五位内的随机正整数。上面我们一共运行了两次,这样我们就有两份随机数文件data1和data2;最后我们把生成的随机数文件上传到HDFS上。现在我们可以来写程序对这两个文件里面的数据进行排序了。

2 使用一个Reduce进行排序

前面我们说了,MapReduce默认只是保证同一个分区内的Key是有序的,但是不保证全局有序。如果我们将所有的数据全部发送到一个Reduce,那么不就可以实现结果全局有序吗?这种方法实现很简单,如下:

 1package com.iteblog.mapreduce.sort;
 2
 3import org.apache.hadoop.conf.Configured;
 4import org.apache.hadoop.fs.Path;
 5import org.apache.hadoop.io.IntWritable;
 6import org.apache.hadoop.io.LongWritable;
 7import org.apache.hadoop.io.NullWritable;
 8import org.apache.hadoop.io.Text;
 9import org.apache.hadoop.mapreduce.Job;
10import org.apache.hadoop.mapreduce.Mapper;
11import org.apache.hadoop.mapreduce.Reducer;
12import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
13import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
14import org.apache.hadoop.util.Tool;
15import org.apache.hadoop.util.ToolRunner;
16
17import java.io.IOException;
18
19public class TotalSortV1 extends Configured implements Tool {
20    static class SimpleMapper extends
21            Mapper<LongWritable, Text, IntWritable, IntWritable> {
22        @Override
23        protected void map(LongWritable key, Text value,
24                           Context context) throws IOException, InterruptedException {
25            IntWritable intWritable = new IntWritable(Integer.parseInt(value.toString()));
26            context.write(intWritable, intWritable);
27        }
28    }
29
30    static class SimpleReducer extends
31            Reducer<IntWritable, IntWritable, IntWritable, NullWritable> {
32        @Override
33        protected void reduce(IntWritable key, Iterable<IntWritable> values,
34                              Context context) throws IOException, InterruptedException {
35            for (IntWritable value : values)
36                context.write(value, NullWritable.get());
37        }
38    }
39
40    @Override
41    public int run(String[] args) throws Exception {
42        if (args.length != 2) {
43            System.err.println("<input> <output>");
44            System.exit(127);
45        }
46
47        Job job = Job.getInstance(getConf());
48        job.setJarByClass(TotalSortV1.class);
49        FileInputFormat.addInputPath(job, new Path(args[0]));
50        FileOutputFormat.setOutputPath(job, new Path(args[1]));
51
52        job.setMapperClass(SimpleMapper.class);
53        job.setReducerClass(SimpleReducer.class);
54        job.setMapOutputKeyClass(IntWritable.class);
55        job.setMapOutputValueClass(IntWritable.class);
56        job.setOutputKeyClass(IntWritable.class);
57        job.setOutputValueClass(NullWritable.class);
58        job.setNumReduceTasks(1);
59        job.setJobName("TotalSort");
60        return job.waitForCompletion(true) ? 0 : 1;
61    }
62
63    public static void main(String[] args) throws Exception {
64        int exitCode = ToolRunner.run(new TotalSort(), args);
65        System.exit(exitCode);
66    }
67}

上面程序的实现很简单,我们直接使用TextInputFormat类来读取上面生成的随机数文件(data1data2)。因为文件里面的数据是正整数,所以我们在 SimpleMapper 类里面直接将value转换成int类型,然后赋值给IntWritable。等数据到 SimpleReducer 的时候,同一个Reduce里面的Key已经全部有序;因为我们设置了一个Reduce作业,这样的话,我们就实现了数据全局有序。运行如下:

 1[iteblog@www.iteblog.com /home/iteblog]$ hadoop jar total-sort-0.1.jar com.iteblog.mapreduce.sort.TotalSortV1 /user/iteblog/input /user/iteblog/output
 2
 3[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -ls /user/iteblog/output
 4Found 2 items
 5-rw-r--r--   3 iteblog supergroup          0 2017-05-09 11:41 /user/iteblog/output/_SUCCESS
 6-rw-r--r--   3 iteblog supergroup    1131757 2017-05-09 11:41 /user/iteblog/output/part-r-00000
 7
 8[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/part-r-00000 | head -n 10
 90
100
110
120
131
141
151
161
171
181
19
20[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output/part-r-00000 | tail -n 10
2132766
2232766
2332766
2432766
2532767
2632767
2732767
2832767
2932767
3032767

从上面的测试结果也可以看出,我们只生成了一个数据文件,而且这个文件里面的数据已经全局有序了。

3 自定义分区函数实现全局有序

上面实现数据全局有序有个很大的局限性:所有的数据都发送到一个Reduce进行排序,这样不能充分利用集群的计算资源,而且在数据量很大的情况下,很有可能会出现OOM问题。我们分析一下,MapReduce默认的分区函数是HashPartitioner,其实现的原理是计算map输出key的hashCode,然后对Reduce个数求模,这样只要求模结果一样的Key都会发送到同一个Reduce。如果我们能够实现一个分区函数,使得

  • 所有 Key < 10000 的数据都发送到Reduce 0;
  • 所有 10000 < Key < 20000 的数据都发送到Reduce 1;
  • 其余的Key都发送到Reduce 2;

这就实现了Reduce 0的数据一定全部小于Reduce 1,且Reduce 1的数据全部小于Reduce 2,再加上同一个Reduce里面的数据局部有序,这样就实现了数据的全局有序。实现如下:

  1package com.iteblog.mapreduce.sort;
  2
  3import com.iteblog.mapreduce.secondSort.IntPair;
  4import org.apache.hadoop.conf.Configured;
  5import org.apache.hadoop.fs.Path;
  6import org.apache.hadoop.io.IntWritable;
  7import org.apache.hadoop.io.LongWritable;
  8import org.apache.hadoop.io.NullWritable;
  9import org.apache.hadoop.io.Text;
 10import org.apache.hadoop.mapreduce.Job;
 11import org.apache.hadoop.mapreduce.Mapper;
 12import org.apache.hadoop.mapreduce.Partitioner;
 13import org.apache.hadoop.mapreduce.Reducer;
 14import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16import org.apache.hadoop.util.Tool;
 17import org.apache.hadoop.util.ToolRunner;
 18
 19import java.io.IOException;
 20
 21public class TotalSortV2 extends Configured implements Tool {
 22    static class SimpleMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
 23        @Override
 24        protected void map(LongWritable key, Text value,
 25                           Context context) throws IOException, InterruptedException {
 26            IntWritable intWritable = new IntWritable(Integer.parseInt(value.toString()));
 27            context.write(intWritable, intWritable);
 28        }
 29    }
 30
 31    static class SimpleReducer extends Reducer<IntWritable, IntWritable, IntWritable, NullWritable> {
 32        @Override
 33        protected void reduce(IntWritable key, Iterable<IntWritable> values,
 34                              Context context) throws IOException, InterruptedException {
 35            for (IntWritable value : values)
 36                context.write(value, NullWritable.get());
 37        }
 38    }
 39
 40    public static class IteblogPartitioner extends Partitioner<IntWritable, IntWritable> {
 41        @Override
 42        public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
 43            int keyInt = Integer.parseInt(key.toString());
 44            if (keyInt < 10000) {
 45                return 0;
 46            } else if (keyInt < 20000) {
 47                return 1;
 48            } else {
 49                return 2;
 50            }
 51        }
 52    }
 53
 54    @Override
 55    public int run(String[] args) throws Exception {
 56        if (args.length != 2) {
 57            System.err.println("<input> <output>");
 58            System.exit(127);
 59        }
 60
 61        Job job = Job.getInstance(getConf());
 62        job.setJarByClass(TotalSortV2.class);
 63        FileInputFormat.addInputPath(job, new Path(args[0]));
 64        FileOutputFormat.setOutputPath(job, new Path(args[1]));
 65
 66        job.setMapperClass(SimpleMapper.class);
 67        job.setReducerClass(SimpleReducer.class);
 68        job.setPartitionerClass(IteblogPartitioner.class);
 69        job.setMapOutputKeyClass(IntWritable.class);
 70        job.setMapOutputValueClass(IntWritable.class);
 71        job.setOutputKeyClass(IntWritable.class);
 72        job.setOutputValueClass(NullWritable.class);
 73        job.setNumReduceTasks(3);
 74        job.setJobName("dw_subject");
 75        return job.waitForCompletion(true) ? 0 : 1;
 76    }
 77
 78    public static void main(String[] args) throws Exception {
 79        int exitCode = ToolRunner.run(new TotalSortV2(), args);
 80        System.exit(exitCode);
 81    }
 82}
 83第二版的排序实现除了自定义的 IteblogPartitioner,其余的和第一种实现一样。现在我们来运行一下:
 84
 85[iteblog@www.iteblog.com /home/iteblog]$ hadoop jar total-sort-0.1.jar com.iteblog.mapreduce.sort.TotalSortV2 /user/iteblog/input /user/iteblog/output1
 86
 87[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -ls /user/iteblog/output1
 88Found 4 items
 89-rw-r--r--   3 iteblog supergroup          0 2017-05-09 13:53 /user/iteblog/output1/_SUCCESS
 90-rw-r--r--   3 iteblog supergroup     299845 2017-05-09 13:53 /user/iteblog/output1/part-r-00000
 91-rw-r--r--   3 iteblog supergroup     365190 2017-05-09 13:53 /user/iteblog/output1/part-r-00001
 92-rw-r--r--   3 iteblog supergroup     466722 2017-05-09 13:53 /user/iteblog/output1/part-r-00002
 93
 94[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00000 | head -n 10
 950
 960
 970
 980
 991
1001
1011
1021
1031
1041
105
106[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00000 | tail -n 10
1079998
1089998
1099998
1109999
1119999
1129999
1139999
1149999
1159999
1169999
117
118[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00001 | head -n 10
11910000
12010000
12110000
12210000
12310000
12410000
12510001
12610001
12710001
12810001
129
130[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00001 | tail -n 10
13119997
13219997
13319998
13419998
13519998
13619998
13719999
13819999
13919999
14019999
141
142[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00002 | head -n 10
14320000
14420000
14520000
14620000
14720000
14820000
14920001
15020001
15120001
15220001
153
154[iteblog@www.iteblog.com /home/iteblog]$ hadoop fs -cat /user/iteblog/output1/part-r-00002 | tail -n 10
15532766
15632766
15732766
15832766
15932767
16032767
16132767
16232767
16332767
16432767

我们已经看到了这个程序生成了三个文件(因为我们设置了Reduce个数为3),而且每个文件都是局部有序;所有小于10000的数据都在part-r-00000里面,所有小于20000的数据都在part-r-00001里面,所有大于20000的数据都在part-r-00002里面。part-r-00000、part-r-00001和part-r-00002三个文件实现了全局有序。

本文分享自微信公众号 - CoXie带你学编程(Pythoni521)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-08-12

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 初学Python:写码时应该缩进使用 tab 还是空格?

    在不同的编辑器里tab的长度可能不一致,所以在一个编辑器里用tab设置缩进后,在其它编辑器里看可能缩进就乱了。空格不会出现这个问题,因为空格就占一个字符的位置。

    用户2966292
  • Python学到什么程度才可以去找工作?掌握这4点足够了!

     大家在学习Python的时候,有人会问“Python要学到什么程度才能出去找工作”,对于在Python培训机构学习Python的同学来说这都不是问题,因为按照...

    用户2966292
  • 在Python中10行代码可以执行哪些高端操作?

    Python以其简单的代码赢得了许多开发人员的青睐。为了促进开发人员用Python开发新的模块,从而形成良性循环,Python可以用短代码实现许多有趣的操作。让...

    用户2966292
  • 什么是IPFS / IPNS?

      IPFS,即星际文件系统,是一个内容可寻址的网络。这意味着,您不是要求网络查询特定网站或域名(例如www.ipfsfirst.com),而是要求提供特定内容...

    用户2312802
  • Sensory推出智能家电语音交互解决方案

    Sensory将与美的集团MCA事业部(Midea Microwave and Cleaner Appliances)在2020 CES展示由嵌入式Sensor...

    用户6026865
  • JCJC错别字系统部署

    双击运行SecureCRTPortable.exe,会弹出登陆界面,主机名添加linux服务器的IP,端口默认22,依次填上用户名(root账户为最大权限)、密...

    田春峰-JCJC错别字检测
  • 快手于冰:跟最优秀的人一起追求极致

    LiveVideoStack:今天我们是在快手北京的总部,邀请到快手音视频技术负责人于冰,于老师同时还是8月份LiveVideoStackCon2019北京的联...

    LiveVideoStack
  • 3.2.4页面分配策略

    对于分页式的虚拟内存,在准备执行时,不需要也不可能把一个进程的所有页都读取到内存,因此,操作系统必须决定读取多少页,也就是说,给特定的进程分配多大的内存空间,这...

    week
  • 猿设计7——真电商之类目的那些关系

    看过上一章节相信你已经认识到类目和属性对于描述商品的重要性。隐约觉察出来,类目和属性之间应该有千丝万缕的联系。但是如何围绕类目去建立实体间的关系,还不是很明确,...

    山旮旯的胖子
  • 一起学设计模式 - 访问者模式

    访问者模式: 预留通路,回调实现。它的实现主要就是通过预先定义好调用的通路,在被访问的对象上定义 accept方法,在访问者的对象上定义 visit方法;然后在...

    battcn

扫码关注云+社区

领取腾讯云代金券