原数据形式入下
1 2 2 4 2 3 2 1 3 1 3 4 4 1 4 4 4 3 1 1
要求按照第一列的顺序排序,如果第一列相等,那么按照第二列排序
如果利用mapreduce过程的自动排序,只能实现根据第一列排序,现在需要自定义一个继承自WritableComparable接口的类,用该类作为key,就可以利用mapreduce过程的自动排序了。代码如下:
package mapReduce;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException;
import mapReduce.SortApp1.MyMapper; import mapReduce.SortApp1.MyReducer; import mapReduce.SortApp1.NewK2;
import org.apache.Hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SortApp { private static String INPUT_PATH = "hdfs://hadoop1:9000/data"; private static String OUT_PATH = "hdfs://hadoop1:9000/dataOut";
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); Path outputDir = new Path(OUT_PATH); if (fileSystem.exists(outputDir)) { fileSystem.delete(outputDir, true); }
Job job = new Job(conf, "data");
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(KeyValue.class); job.setMapOutputValueClass(LongWritable.class);
job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.waitForCompletion(true);
}
static class MyMapper extends Mapper<LongWritable, Text, KeyValue, LongWritable> { @Override protected void map(LongWritable k1, Text value, Context context) throws IOException, InterruptedException { final String[] splited = value.toString().split("\t"); final KeyValue k2 = new KeyValue(Long.parseLong(splited[0]), Long.parseLong(splited[1])); final LongWritable v2 = new LongWritable(Long.parseLong(splited[1])); context.write(k2, v2); } }
static class MyReducer extends Reducer<KeyValue, LongWritable, LongWritable, LongWritable> { protected void reduce(KeyValue k2,java.lang.Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException { context.write(new LongWritable(k2.first), new LongWritable(k2.second)); } } static class KeyValue implements WritableComparable<KeyValue>{ Long first; Long second;
public KeyValue(){}
public KeyValue(long first, long second){ this.first = first; this.second = second; }
@Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); }
@Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); }
@Override public int compareTo(KeyValue o) { final long minus = this.first - o.first; if(minus != 0){ return (int)minus; } return (int)(this.second - o.second); }
public int hashCode() { return this.first.hashCode()+this.second.hashCode(); }
@Override public boolean equals(Object obj) { if (!(obj instanceof KeyValue)){ return false; } KeyValue kv = (KeyValue)obj; return (this.first == kv.first)&& (this.second == kv.second); }
public boolean equals(Object obj) { if(!(obj instanceof NewK2)){ return false; } NewK2 oK2 = (NewK2)obj; return (this.first==oK2.first)&&(this.second==oK2.second); } } }
KeyValue 中的first second属性必须写成Long类型,而不是long,否则 this.first.hashCode()不成立。对任何实现WritableComparable的类都能进行排序,这可以一些复杂的数据,只要把他们封装成实现了WritableComparable的类作为key就可以了