前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MapReduce排序分类(二)

MapReduce排序分类(二)

原创
作者头像
堕落飞鸟
发布2023-05-12 11:39:46
2870
发布2023-05-12 11:39:46
举报
文章被收录于专栏:飞鸟的专栏飞鸟的专栏

二、外部排序

外部排序是指当数据量太大无法全部载入内存时,需要将数据分割成多个小块进行排序,然后再将排序后的小块合并成一个大的有序块。在MapReduce中,外部排序通常是在Reduce端进行的,即每个Reduce任务将它们处理的数据进行排序,然后将排序后的结果合并成一个有序的输出文件。

外部排序的实现方法有多种,包括归并排序、堆排序、快速排序等。其中,归并排序也是一种常用的排序算法,因为它可以很好地应用于外部排序场景,能够处理大规模数据集。

下面是一个使用归并排序进行外部排序的MapReduce程序示例:

代码语言:javascript
复制
public class ExternalSort {
    public static class Map extends Mapper<LongWritable, Text, IntWritable, Text> {
        private IntWritable score = new IntWritable();
        private Text name = new Text();
        
        public void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
            String[] parts = value.toString().split(",");
            name.set(parts[0]);
            score.set(Integer.parseInt(parts[1]));
            context.write(score, name);
        }
    }
    
    public static class Reduce extends Reducer<IntWritable, Text, Text, IntWritable> {
        private List<Text> nameList = new ArrayList<Text>();
        private IntWritable score = new IntWritable();
        
        public void reduce(IntWritable key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
            nameList.clear();
            for (Text value : values) {
                nameList.add(new Text(value));
            }
            Collections.sort(nameList);
            for (Text name : nameList) {
                context.write(name, key);
            }
        }
    }
    
    public static class MergeReducer extends Reducer<IntWritable, Text, Text, IntWritable> {
        private List<Text> nameList = new ArrayList<Text>();
        private IntWritable score = new IntWritable();
        
        public void reduce(IntWritable key, Iterable<Text> values, Context context) 
            throws IOException, InterruptedException {
            nameList.clear();
            for (Text value : values) {
                nameList.add(new Text(value));
            }
            Collections.sort(nameList);
            for (Text name : nameList) {
                context.write(name, key);
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "ExternalSort");
        job.setJarByClass(ExternalSort.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        job.setNumReduceTasks(10);
        job.setSortComparatorClass(IntWritable.Comparator.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1] + "/temp"));
        job.waitForCompletion(true);
        
        Job mergeJob = new Job(conf, "MergeSort");
        mergeJob.setJarByClass(ExternalSort.class);
        mergeJob.setReducerClass(MergeReducer.class);
        mergeJob.setOutputKeyClass(IntWritable.class);
        mergeJob.setOutputValueClass(Text.class);
        mergeJob.setNumReduceTasks(1);
        mergeJob.setSortComparatorClass(IntWritable.Comparator.class);
        for (int i = 0; i < 10; i++) {
            FileInputFormat.addInputPath(mergeJob, new Path(args[1] + "/temp/part-r-" + String.format("%05d", i)));
        }
        FileOutputFormat.setOutputPath(mergeJob, new Path(args[1]));
        System.exit(mergeJob.waitForCompletion(true) ? 0 : 1);
    }
}

在这个例子中,Map任务读入的数据是以“姓名,成绩”的形式存储的。Map任务将每个学生的成绩作为key,姓名作为value输出,以便Reduce任务进行排序。Reduce任务将每个成绩相同的学生姓名存储到一个列表中,然后对列表进行排序,并按照学生姓名从小到大的顺序输出。注意,在这个例子中,Reduce任务的输出key是IntWritable类型,value是Text类型。

为了提高排序的效率,可以使用多个Reduce任务对数据进行排序。在本例中,我们使用了10个Reduce任务对数据进行排序,每个任务处理的数据量约为总数据量的1/10。

在排序完成后,我们需要将所有Reduce任务的输出合并成一个有序的输出文件。这可以通过另一个MapReduce任务来实现,我们称之为“合并排序”任务。在这个任务中,我们只需要将所有Reduce任务的输出作为输入,再次进行排序即可。

需要注意的是,在这个例子中,我们使用了“IntWritable.Comparator”作为排序器。这个排序器是IntWritable类型的默认排序器,它会按照数字的大小进行排序。如果我们要按照其他方式进行排序,例如按照字典序对字符串进行排序,就需要自定义一个排序器,并在MapReduce任务中指定使用该排序器。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 二、外部排序
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档