专栏首页写字母的代码哥mapreduce的二次排序-分区分组

mapreduce的二次排序-分区分组

mapreduce的二次排序-分区分组

  • 在0.20.0 以前使用的是 setPartitionerClass setOutputkeyComparatorClass setOutputValueGroupingComparator
  • 在0.20.0以后使用是 job.setPartitionerClass(Partitioner p); 决定数据的分区规则 job.setSortComparatorClass(RawComparator c); 决定数据的排序规则 job.setGroupingComparatorClass(RawComparator c); 决定数据的分组规则
  • 分区与分组的区别 1.在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。 2.每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。 3.如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key数据类型实现的compareTo方法。 4.在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。 5.然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,一个分组就是一个iterable。

二次排序

就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序 的结果 。例如

输入文件

20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8

输出 ------------------------------------------------ 1 2 ------------------------------------------------ 3 4 ------------------------------------------------ 5 6 ------------------------------------------------ 7 8 7 82 ------------------------------------------------ 12 211 ------------------------------------------------ 20 21 20 53 20 522 ------------------------------------------------ 31 42 ------------------------------------------------ 40 511 ------------------------------------------------ 50 51 50 52 50 53 50 53 50 54 50 62 50 512 50 522 ------------------------------------------------ 60 51 60 52 60 53 60 56 60 56 60 57 60 57 60 61 ------------------------------------------------ 63 61 ------------------------------------------------ 70 54 70 55 70 56 70 57 70 58 70 58 ------------------------------------------------ 71 55 71 56 ------------------------------------------------ 73 57 ------------------------------------------------ 74 58 ------------------------------------------------ 203 21 ------------------------------------------------ 530 54 ------------------------------------------------ 730 54 ------------------------------------------------ 740 58

具体步骤:

  1. 自定义key 参见《hadoop数据类型&自定义》
  2. 由于key是自定义的,所以还需要自定义一下函数比较分组分区类:
package secondarySort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SecondarySort
{
//自己定义的key类应该实现WritableComparable接口
public static class IntPair implements WritableComparable<IntPair>
{
int first;
int second;
public void set(int left, int right)
{
first = left;
second = right;
}
public int getFirst()
{
return first;
}
public int getSecond()
{
return second;
}
@Override
//反序列化,从流中的二进制转换成IntPair
public void readFields(DataInput in) throws IOException
{
first = in.readInt();
second = in.readInt();
}
@Override
//序列化,将IntPair转化成使用流传送的二进制
public void write(DataOutput out) throws IOException
{
out.writeInt(first);
out.writeInt(second);
}
@Override
//key的比较
public int compareTo(IntPair o)
{
if (first != o.first)
{
return first < o.first ? -1 : 1;
}
else if (second != o.second)
{
return second < o.second ? -1 : 1;
}
else
{
return 0;
}
}
//新定义类应该重写的两个方法
@Override
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)
public int hashCode()
{
return first * 157 + second;
}
@Override
public boolean equals(Object right)
{
if (right == null)
return false;
if (this == right)
return true;
if (right instanceof IntPair)
{
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
}
else
{
return false;
}
}
}
/**
* 分区函数类。根据first确定Partition。
*/
public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>
{
@Override
public int getPartition(IntPair key, IntWritable value,int numPartitions)
{
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
/**
* 分组函数类。只要first相同就属于同一个组。
*/
/*//第一种方法,实现接口RawComparator
public static class GroupingComparator implements RawComparator<IntPair> {
@Override
public int compare(IntPair o1, IntPair o2) {
int l = o1.getFirst();
int r = o2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
@Override
//一个字节一个字节的比,直到找到一个不相同的字节,然后比这个字节的大小作为两个字节流的大小比较结果。
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
// TODO Auto-generated method stub
return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
b2, s2, Integer.SIZE/8);
}
}*/
//第二种方法,继承WritableComparator
public static class GroupingComparator extends WritableComparator
{
protected GroupingComparator()
{
super(IntPair.class, true);
}
@Override
//Compare two WritableComparables.
public int compare(WritableComparable w1, WritableComparable w2)
{
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int l = ip1.getFirst();
int r = ip2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}
// 自定义map
public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
{
private final IntPair intkey = new IntPair();
private final IntWritable intvalue = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
int left = 0;
int right = 0;
if (tokenizer.hasMoreTokens())
{
left = Integer.parseInt(tokenizer.nextToken());
if (tokenizer.hasMoreTokens())
right = Integer.parseInt(tokenizer.nextToken());
intkey.set(left, right);
intvalue.set(right);
context.write(intkey, intvalue);
}
}
}
// 自定义reduce
//
public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>
{
private final Text left = new Text();
private static final Text SEPARATOR = new Text("------------------------------------------------");
public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
context.write(SEPARATOR, null);
left.set(Integer.toString(key.getFirst()));
for (IntWritable val : values)
{
context.write(left, val);
}
}
}
/**
* @param args
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
{
// TODO Auto-generated method stub
// 读取hadoop配置
Configuration conf = new Configuration();
// 实例化一道作业
Job job = new Job(conf, "secondarysort");
job.setJarByClass(SecondarySort.class);
// Mapper类型
job.setMapperClass(Map.class);
// 不再需要Combiner类型,因为Combiner的输出类型<Text, IntWritable>对Reduce的输入类型<IntPair, IntWritable>不适用
//job.setCombinerClass(Reduce.class);
// Reducer类型
job.setReducerClass(Reduce.class);
// 分区函数
job.setPartitionerClass(FirstPartitioner.class);
// 分组函数
job.setGroupingComparatorClass(GroupingComparator.class);
// map 输出Key的类型
job.setMapOutputKeyClass(IntPair.class);
// map输出Value的类型
job.setMapOutputValueClass(IntWritable.class);
// rduce输出Key的类型,是Text,因为使用的OutputFormatClass是TextOutputFormat
job.setOutputKeyClass(Text.class);
// rduce输出Value的类型
job.setOutputValueClass(IntWritable.class);
// 将输入的数据集分割成小数据块splites,同时提供一个RecordReder的实现。
job.setInputFormatClass(TextInputFormat.class);
// 提供一个RecordWriter的实现,负责数据输出。
job.setOutputFormatClass(TextOutputFormat.class);
// 输入hdfs路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 输出hdfs路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 恕我直言你可能真的不会java第6篇:Stream性能差?不要人云亦云

    问:stream比for循环慢5倍,用这个是为了啥? 答:互联网是一个新闻泛滥的时代,三人成虎,以假乱真的事情时候发生。作为一个技术开发者,要自己去动手去做,...

    字母哥博客
  • hadoop数据类型及自定义

    自定义Hadoop数据类型后,需要明确告诉Hadoop来使用它们。这是 JobConf 所能担当的了。

    字母哥博客
  • Java9-Reactive Stream API响应式编程

    Java 9的 Reactive Streams是对异步流式编程的一种实现。它基于异步发布和订阅模型,具有非阻塞“背压”数据处理的特点。

    字母哥博客
  • Android 单元测试 Robolectric

    通过实现一套 JVM 能够运行的 Android 代码,从而实现脱离 Android 环境进行测试。

    七适散人
  • 编程小知识之 struct 构造函数(C#)

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

    用户2615200
  • Spring学习-第一篇:关于读取配置文件

      最近在用Spring大法的框架,便利了我们,但是对于内部完全不知,虽然满足正常业务需求,但是一旦出现问题,解决效率太低,所以准备深入学习一下,准备个一系列,...

    haoming1100
  • 进程调度(一)——FIFO算法

    这是最早出现的置换算法。该算法总是淘汰最先进入内存的页面,即选择在内存中驻留时间最久的页面予以淘汰。该算法实现简单,只需把一个进程已调入内存的页面,按先后次序链...

    AI那点小事
  • Android仿qq分组管理的第三方库

    本文实例为大家分享了Android仿qq分组管理的第三方库,供大家参考,具体内容如下

    砸漏
  • 如何去除代码中的多次if而引发的一连串面试问题

    小白:不是,真正的工厂模式有两种:工厂方法和抽象工厂。工厂方法使用继承,首先定义一个抽象父类工厂,然后定义子类工厂,把工厂要创建的对象委托给子工厂类,子工厂类实...

    JavaQ
  • Leetcode 169 多数元素

    给定一个大小为 n 的数组,找到其中的多数元素。多数元素是指在数组中出现次数大于 ⌊ n/2 ⌋ 的元素。

    glm233

扫码关注云+社区

领取腾讯云代金券