Hello,大家好!博主上篇讲解了合并,这篇要讲的是辅助排序。如何讲解这个章节呢?首先先对什么是合并进行解释,然后通过案例进行证明。
什么是GroupingComparator分组(辅助排序)? 对Reduce阶段的数据根据某一个或几个字段进行分组。
分组排序的步骤:
@Override public int compare(WritableComparable a, WritableComparable b) { // 比较的业务逻辑 return result; }
protected OrderGroupingComparator() { super(OrderBean.class, true); }
订单id | 商品id | 成交金额 |
---|---|---|
0000001 | Pdt_01 | 222.8 |
Pdt_02 | 33.8 | |
0000001 | Pdt_03 | 522.8 |
Pdt_04 | 122.4 | |
Pdt_05 | 722.4 | |
0000001 | Pdt_06 | 232.8 |
Pdt_02 | 33.8 |
现在需要求出每一个订单中最贵的商品。
1 222.8 2 722.4 3 232.8
(1)利用“订单id和成交金额”作为key,可以将Map阶段读取到的所有订单数据按照id升序排序,如果id相同再按照金额降序排序,发送到Reduce。 (2)在Reduce端利用groupingComparator将订单id相同的kv聚合成组,然后取第一个即是该订单中最贵商品,如下图所示所示。
package com.buwenbuhuo.groupingcomparator; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * @author 卜温不火 * @create 2020-04-24 23:43 * com.buwenbuhuo.groupingcomparator - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class OrderBean implements WritableComparable<OrderBean> { private String orderId; private String productId; private double price; @Override public String toString() { return orderId + "\t" + productId + "\t" + price; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getProductId() { return productId; } public void setProductId(String productId) { this.productId = productId; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public int compareTo(OrderBean o) { int compare = this.orderId.compareTo(o.orderId); if (compare == 0) { return Double.compare(o.price, this.price); } else { return compare; } } @Override public void write(DataOutput out) throws IOException { out.writeUTF(orderId); out.writeUTF(productId); out.writeDouble(price); } @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.productId = in.readUTF(); this.price = in.readDouble(); } }
package com.buwenbuhuo.groupingcomparator; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author 卜温不火 * @create 2020-04-24 23:43 * com.buwenbuhuo.groupingcomparator - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> { private OrderBean orderBean = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); orderBean.setOrderId(fields[0]); orderBean.setProductId(fields[1]); orderBean.setPrice(Double.parseDouble(fields[2])); context.write(orderBean, NullWritable.get()); } }
package com.buwenbuhuo.groupingcomparator; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * @author 卜温不火 * @create 2020-04-24 23:43 * com.buwenbuhuo.groupingcomparator - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class OrderComparator extends WritableComparator { protected OrderComparator() { super(OrderBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean oa = (OrderBean) a; OrderBean ob = (OrderBean) b; return oa.getOrderId().compareTo(ob.getOrderId()); } }
package com.buwenbuhuo.groupingcomparator; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * @author 卜温不火 * @create 2020-04-24 23:43 * com.buwenbuhuo.groupingcomparator - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> { @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { Iterator<NullWritable> iterator = values.iterator(); for (int i = 0; i < 2; i++) { if (iterator.hasNext()) { context.write(key, iterator.next()); } } } }
package com.buwenbuhuo.groupingcomparator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author 卜温不火 * @create 2020-04-24 23:43 * com.buwenbuhuo.groupingcomparator - the name of the target package where the new class or interface will be created. * mapreduce0422 - the name of the current project. */ public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(OrderDriver.class); job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setGroupingComparatorClass(OrderComparator.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("d:\\input")); FileOutputFormat.setOutputPath(job, new Path("d:\\output")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句