多个字段中如何按其中两个进行排序(二次排序)

多个字段中如何按其中两个进行排序(二次排序)

1 原理

    二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果。

    这里主要讲如何使用一个Mapreduce就可以实现二次排序。Hadoop有自带的SecondarySort程序,但这个程序只能对整数进行排序,所以我们需要对其进行改进,使其可以对任意字符串进行排序。下面会分别列出这两个程序的详解。   

    Hadoop自带的例子中定义的map和reduce如下,关键是它对输入输出类型的定义:(java泛型编程)

        public static  class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>           public static class Reduce extends Reducer<IntPair, NullWritable,  IntWritable, IntWritable>

    在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时 InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReder会将文 本的一行的行号作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable,   Text>的原因。然后调用自定义Map的map方法,将一个个<LongWritable,   Text>对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair,   IntWritable>。最终是生成一个List<IntPair,   IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到 一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次 排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在第一个 例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。      在reduce阶 段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比 较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用 jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们 的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方 法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。   2 Hadoop自带的只对两个整型进行排序例子详解

2.1 测试数据如下所示:

20 21 50 51 50 52 50 53 50 54 60 51 60 53 60 52 60 56 60 57 70 58 60 61 70 54 70 55 70 56 70 57 70 58 1 2 3 4 5 6 7 82 203 21 50 512 50 522 50 53 530 54 40 511 20 53 20 522 60 56 60 57 740 58 63 61 730 54 71 55 71 56 73 57 74 58 12 211 31 42 50 62 7 8 2.2 程序如下所示:(这里的程序是可以直接在Eclipse中提交任务的,需要的其它两个Java文件可查看自己另外一篇帖子http://my.oschina.net/mkh/blog/340112

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Job.JobState;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import service.plugin.EJob;

public class SecondarySort{
    /**
     * @ClassName IntPair
     * @Description 定义IntPair对象,该对象实现WritableComparable接口,描述第一列和第二列数据,同时完成两列数据的相关操作,这里是对二者进行比较
     * 
     */
    public static class IntPair implements WritableComparable<IntPair> {
        int first;
        int second;

        /**
         * Set the left and right values.
         */
        public void set(int left, int right) {
            first = left;
            second = right;
        }

        public int getFirst() {
            return first;
        }

        public int getSecond() {
            return second;
        }

        @Override
        // 反序列化,从流中的二进制转换成IntPair
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            first = in.readInt();
            second = in.readInt();
        }

        @Override
        // 序列化,将IntPair转化成使用流传送的二进制
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeInt(first);
            out.writeInt(second);
        }

        @Override
        // key的比较
        public int compareTo(IntPair o) {
            // TODO Auto-generated method stub
            if (first != o.first) {
                return first < o.first ? -1 : 1;
            } else if (second != o.second) {
                return second < o.second ? -1 : 1;
            } else {
                return 0;
            }
        }

        // 新定义类应该重写的两个方法,不用这个方法好像也可以
        // @Override
        // The hashCode() method is used by the HashPartitioner (the default
        // partitioner in MapReduce)
        // public int hashCode() {
        // return first * 157 + second;
        // }

        @Override
        public boolean equals(Object right) {
            if (right == null)
                return false;
            if (this == right)
                return true;
            if (right instanceof IntPair) {
                IntPair r = (IntPair) right;
                return r.first == first && r.second == second;
            } else {
                return false;
            }
        }
    }

    /**
     * 分区函数类。根据first确定Partition。
     */
    public static class FirstPartitioner extends Partitioner<IntPair, IntWritable> {
        @Override
        public int getPartition(IntPair key, IntWritable value, int numPartitions) {
            System.out.println("FirstPartitioner-----------------------------------------------");
            System.out.println("Math.abs(key.getFirst() * 127) % numPartitions: " + Math.abs(key.getFirst() * 127) % numPartitions);
            return Math.abs(key.getFirst() * 127) % numPartitions;
        }
    }

    /**
     * 分组函数类。只要first相同就属于同一个组。
     */
    /*
     * //第一种方法,实现接口RawComparator public static class GroupingComparator
     * implements RawComparator<IntPair> {
     * 
     * @Override public int compare(IntPair o1, IntPair o2) { int l =
     * o1.getFirst(); int r = o2.getFirst(); return l == r ? 0 : (l < r ? -1 :
     * 1); }
     * 
     * @Override //一个字节一个字节的比,直到找到一个不相同的字节,然后比这个字节的大小作为两个字节流的大小比较结果。 public int
     * compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ // TODO
     * Auto-generated method stub return WritableComparator.compareBytes(b1, s1,
     * Integer.SIZE/8, b2, s2, Integer.SIZE/8); } }
     */
    // 第二种方法,继承WritableComparator
    public static class GroupingComparator extends WritableComparator {
        protected GroupingComparator() {
            super(IntPair.class, true);
            System.out.println("GroupingComparator---------------------------------");
        }

        @Override
        // Compare two WritableComparables.
        public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            int l = ip1.getFirst();
            int r = ip2.getFirst();
            return l == r ? 0 : (l < r ? -1 : 1);
        }
    }

    /**
     * @ClassName Map
     * @Description 自定义map类,将每行数据进行分拆,第一列的数据存入left变量,第二列数据存入right变量
     *              在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer
     *              。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。
     */
    public static class Map extends
            Mapper<LongWritable, Text, IntPair, IntWritable> {
        private final IntPair intkey = new IntPair();
        private final IntWritable intvalue = new IntWritable();
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            // 调用java自己的工具类StringTokenizer(),将map输入的每行字符串按规则进行分割成每个字符串,这些规则有\t\n\r\f,基本上分割的结果都可以保证到最细的字符串粒度
            StringTokenizer tokenizer = new StringTokenizer(line);
            int left = 0;
            int right = 0;
            if (tokenizer.hasMoreTokens()) {
                left = Integer.parseInt(tokenizer.nextToken());
                System.out.println("left: " + left);
                if (tokenizer.hasMoreTokens())
                    right = Integer.parseInt(tokenizer.nextToken());
                intkey.set(left, right);
                intvalue.set(right);
                context.write(intkey, intvalue);
            }
        }
    }

    // 自定义reduce
    public static class Reduce extends
            Reducer<IntPair, IntWritable, Text, IntWritable> {
        private final Text left = new Text();
        private static final Text SEPARATOR = new Text("------------------------------------------------");
        public void reduce(IntPair key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            context.write(SEPARATOR, null);
            System.out.println("------------------------------------------------");
            left.set(Integer.toString(key.getFirst()));
            for (IntWritable val : values) {
                System.out.println("reduce: left " + left + "    , val " + val);
                context.write(left, val);
            }
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        // 读取hadoop配置
        File jarFile = EJob.createTempJar("bin");
        ClassLoader classLoader = EJob.getClassLoader();
        Thread.currentThread().setContextClassLoader(classLoader);

        Configuration conf = new Configuration(true);
        String[] otherArgs = new String[2];
        otherArgs[0] = "hdfs://192.168.1.100:9000/test_in/secondary_sort_data.txt";
        String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
        otherArgs[1] = "hdfs://192.168.1.100:9000/test_out/mr-" + time;

        Job job = new Job(conf, "secondarysort");
        job.setJarByClass(SecondarySort.class);
        ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
        job.setMapperClass(Map.class);
        // 不再需要Combiner类型,因为Combiner的输出类型<Text,
        // IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用
        // job.setCombinerClass(Reduce.class);

        // 分区函数
        job.setPartitionerClass(FirstPartitioner.class);
        // 分组函数
        job.setGroupingComparatorClass(GroupingComparator.class);

        // Reducer类型
        job.setReducerClass(Reduce.class);

        // map输出Key的类型
        job.setMapOutputKeyClass(IntPair.class);
        // map输出Value的类型
        job.setMapOutputValueClass(IntWritable.class);
        // reduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormat
        job.setOutputKeyClass(Text.class);
        // reduce输出Value的类型
        job.setOutputValueClass(IntWritable.class);

        // 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。
        job.setInputFormatClass(TextInputFormat.class);
        // 提供一个RecordWriter的实现,负责数据输出。
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        // 提交job
        if (job.waitForCompletion(false)) {
            System.out.println("job ok !");
        } else {
            System.out.println("job error !");
        }
    }
}

2.3 执行结果如下所示:

------------------------------------------------ 1    2 ------------------------------------------------ 3    4 ------------------------------------------------ 5    6 ------------------------------------------------ 7    8 7    82 ------------------------------------------------ 12    211 ------------------------------------------------ 20    21 20    53 20    522 ------------------------------------------------ 31    42 ------------------------------------------------ 40    511 ------------------------------------------------ 50    51 50    52 50    53 50    53 50    54 50    62 50    512 50    522 ------------------------------------------------ 60    51 60    52 60    53 60    56 60    56 60    57 60    57 60    61 ------------------------------------------------ 63    61 ------------------------------------------------ 70    54 70    55 70    56 70    57 70    58 70    58 ------------------------------------------------ 71    55 71    56 ------------------------------------------------ 73    57 ------------------------------------------------ 74    58 ------------------------------------------------ 203    21 ------------------------------------------------ 530    54 ------------------------------------------------ 730    54 ------------------------------------------------ 740    58 3 改进后的二次排序(可对字符串进行排序)

3.1 测试数据如下所示:

import java import java import java import java import1 org import org1 import1 org import2 org2 import org import2 org1 import1 org import1 org import org2 import2 org3             org import org import1 org importin org import org hello time 3.2 程序如下所示:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import service.plugin.EJob;

public class SecondarySortString {
    // 自己定义的key类应该实现WritableComparable接口
    public static class IntPair implements WritableComparable<IntPair> {
        String first;
        String second;

        /**
         * Set the left and right values.
         */
        public void set(String left, String right) {
            first = left;
            second = right;
        }

        public String getFirst() {
            return first;
        }

        public String getSecond() {
            return second;
        }

        // 反序列化,从流中的二进制转换成IntPair
        public void readFields(DataInput in) throws IOException {
            first = in.readUTF();
            second = in.readUTF();
        }

        // 序列化,将IntPair转化成使用流传送的二进制
        public void write(DataOutput out) throws IOException {
            out.writeUTF(first);
            out.writeUTF(second);
        }

        // 重载 compareTo 方法,进行组合键 key 的比较,该过程是默认行为。
        // 分组后的二次排序会隐式调用该方法。
        public int compareTo(IntPair o) {
            if (!first.equals(o.first)) {
                return first.compareTo(o.first);
            } else if (!second.equals(o.second)) {
                return second.compareTo(o.second);
            } else {
                return 0;
            }
        }

        // 新定义类应该重写的两个方法
        // The hashCode() method is used by the HashPartitioner (the default
        // partitioner in MapReduce)
        public int hashCode() {
            return first.hashCode() * 157 + second.hashCode();
        }

        public boolean equals(Object right) {
            if (right == null)
                return false;
            if (this == right)
                return true;
            if (right instanceof IntPair) {
                IntPair r = (IntPair) right;
                return r.first.equals(first) && r.second.equals(second);
            } else {
                return false;
            }
        }
    }

    /**
     * 分区函数类。根据first确定Partition。
     */
    public static class FirstPartitioner extends Partitioner<IntPair, Text> {
        public int getPartition(IntPair key, Text value, int numPartitions) {
            return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
        }
    }

    /**
     * 分组函数类。只要first相同就属于同一个组。
     */
    /*
     * //第一种方法,实现接口RawComparator public static class GroupingComparator
     * implements RawComparator<IntPair> { public int compare(IntPair o1,
     * IntPair o2) { int l = o1.getFirst(); int r = o2.getFirst(); return l == r
     * ? 0 : (l < r ? -1 : 1); }
     * //一个字节一个字节的比,直到找到一个不相同的字节,然后比这个字节的大小作为两个字节流的大小比较结果。 public int
     * compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){ return
     * WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2,
     * Integer.SIZE/8); } }
     */
    // 第二种方法,继承WritableComparator
    public static class GroupingComparator extends WritableComparator {
        protected GroupingComparator() {
            super(IntPair.class, true);
        }
        // Compare two WritableComparables.
        // 重载 compare:对组合键按第一个自然键排序分组
        public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair ip1 = (IntPair) w1;
            IntPair ip2 = (IntPair) w2;
            String l = ip1.getFirst();
            String r = ip2.getFirst();
            return l.compareTo(r);
        }
    }

    // 自定义map
    public static class Map extends Mapper<LongWritable, Text, IntPair, Text> {
        private final IntPair keyPair = new IntPair();
        String[] lineArr = null;
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            if(line.isEmpty()){
                return;
            }
            lineArr = line.split(" ", -1);
            keyPair.set(lineArr[0], lineArr[1]);
            context.write(keyPair, value);
        }
    }

    // 自定义reduce
    public static class Reduce extends Reducer<IntPair, Text, Text, Text> {
        private static final Text SEPARATOR = new Text("------------------------------------------------");
        public void reduce(IntPair key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            context.write(SEPARATOR, null);
            for (Text val : values) {
                context.write(null, val);
            }
        }
    }

    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        File jarFile = EJob.createTempJar("bin");
        ClassLoader classLoader = EJob.getClassLoader();
        Thread.currentThread().setContextClassLoader(classLoader);

        Configuration conf = new Configuration(true);
        String[] otherArgs = new String[2];
        otherArgs[0] = "hdfs://192.168.1.100:9000/data/test_in/secondary_sort_data_string.txt";
        String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
        otherArgs[1] = "hdfs://192.168.1.100:9000/data/test_out/mr-" + time;
        
        // 实例化一道作业
        Job job = new Job(conf, "secondarysort");
        job.setJarByClass(SecondarySort.class);
        
        ((JobConf) job.getConfiguration()).setJar(jarFile.toString());
        
        // Mapper类型
        job.setMapperClass(Map.class);
        // 不再需要Combiner类型,因为Combiner的输出类型<Text,
        // IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用
        // job.setCombinerClass(Reduce.class);
        // Reducer类型
        job.setReducerClass(Reduce.class);
        // 分区函数
        job.setPartitionerClass(FirstPartitioner.class);
        // 分组函数
        job.setGroupingComparatorClass(GroupingComparator.class);

        // map 输出Key的类型
        job.setMapOutputKeyClass(IntPair.class);
        // map输出Value的类型
        job.setMapOutputValueClass(Text.class);
        // rduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormat
        job.setOutputKeyClass(Text.class);
        // rduce输出Value的类型
        job.setOutputValueClass(Text.class);

        // 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。
        job.setInputFormatClass(TextInputFormat.class);
        // 提供一个RecordWriter的实现,负责数据输出。
        job.setOutputFormatClass(TextOutputFormat.class);

        // 输入hdfs路径
        FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
        // 输出hdfs路径
//        FileSystem.get(conf).delete(new Path(args[1]), true);
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        // 提交job
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3.3 执行结果如下所示:

------------------------------------------------             org ------------------------------------------------ hello time ------------------------------------------------ import java import java import java import java import org import org import org import org1 import org2 ------------------------------------------------ import1 org import1 org import1 org import1 org import1 org ------------------------------------------------ import2 org1 import2 org2 import2 org3 ------------------------------------------------ importin org

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏二进制文集

Thrift 对象序列化、反序列化-字节数组分析

本篇博客仅分析Thrift对象的序列化、反序列化的字节数组,以及Thrift对象的序列化、反序列化原理。其他源码分析会另开章节~

26820
来自专栏菩提树下的杨过

java:POI导出excel

POI是一个开源项目,专用于java平台上操作MS OFFICE,企业应用开发中可用它方便导出Excel. 下面是使用示例: 1、maven中先添加依赖项 1 ...

37260
来自专栏函数式编程语言及工具

泛函编程(27)-泛函编程模式-Monad Transformer

    经过了一段时间的学习,我们了解了一系列泛函数据类型。我们知道,在所有编程语言中,数据类型是支持软件编程的基础。同样,泛函数据类型Foldable,Mon...

21070
来自专栏Java成神之路

PL/SQL学习笔记_03_存储函数与存储过程

ORACLE 提供可以把 PL/SQL 程序存储在数据库中,并可以在任何地方来运行它。这样就叫存储过程或函数。

8530
来自专栏刘笑江的专栏

learn-haskell

13230
来自专栏Albert陈凯

2018-04-26 Java – Read File to String Examples三种方法把文件读成一个字符串

原文地址:(英文版) https://howtodoinjava.com/core-java/io/java-read-file-to-string-exam...

42560
来自专栏奔跑的蛙牛技术博客

过滤器模式过滤器模式

过滤器模式(Filter Pattern)或标准模式(Criteria Pattern)是一种设计模式,这种模式允许开发人员使用不同的标准来过滤一组对象,通过逻...

17320
来自专栏wannshan(javaer,RPC)

dubbo序列化过程源码分析

先看下dubbo在serialize层的类设计方案 序列化方案的入口,是接口Serialization的实现类。 /** * Serialization. ...

91590
来自专栏CodingToDie

FastSql ORM 实现

FastSql 中 ORM 的实现 Table of Contents 原理 实现 1. 使用注解 2. 反射工具类 3. 简单的 model 4. 注解解析 ...

52360
来自专栏Code_iOS

数据结构:链表

工程代码 Github: Data_Structures_C_Implemention -- Link List

19010

扫码关注云+社区

领取腾讯云代金券