首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MapReduce之输出结果排序

MapReduce之输出结果排序

作者头像
用户4919348
发布2019-04-17 14:39:40
2K0
发布2019-04-17 14:39:40
举报
文章被收录于专栏:波波烤鸭波波烤鸭

  前面的案例中我们介绍了统计出每个用户的上行流量,下行流量及总流量,现在我们想要将输出的结果按照总流量倒序排序。

在这里插入图片描述
在这里插入图片描述

实现思路

  MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前会排序),排序的依据是map输出的key。所以我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法来指定比较规则

实现步骤

1.自定义Bean

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/**
 * 存储流量相关数据
 * @author 波波烤鸭
 *
 */
public class Flow implements WritableComparable<Flow> {
	// 上下流量
	private long upFlow;

	// 下行流量
	private long downFlow;
	// 总流量
	private long sumFlow;
	
	/**
	 * 比较Flow对象的总流量
	 */
	@Override
	public int compareTo(Flow o) {
		// TODO Auto-generated method stub
		return -(int)(this.sumFlow - o.getSumFlow());

	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public Flow(long upFlow, long downFlow) {
		super();
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow + downFlow;
	}

	/**
	 * 无参构造方法必须要有 反射的时候需要用到
	 */
	public Flow() {
		super();
	}

	/**
	 * 序列化方法
	 */
	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
	}

	@Override
	public String toString() {
		return upFlow + "\t" + downFlow + "\t" + sumFlow;
	}

	/**
	 * 反序列化 反序列化的顺序和序列化的顺序一致
	 */
	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		upFlow = in.readLong();
		downFlow = in.readLong();
		sumFlow = in.readLong();
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}
}

2.Map阶段

public class FlowCountMap extends Mapper<LongWritable, Text, Flow, Text> {

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// 将一行数据转换为String
		String line = value.toString();
		// 切分字段
		String[] fields = line.split("\t");
		// 取出手机号
		String phoneNum = fields[0];
		// 取出上行流量下行流量
		long upFlow = Long.parseLong(fields[fields.length - 3]);
		long downFlow = Long.parseLong(fields[fields.length - 2]);
		Flow flow = new Flow(upFlow, downFlow);
		context.write(flow, new Text(phoneNum));
	}
}

3.Reduce阶段

public class FlowCountReducer extends Reducer<Flow, Text, Text, Flow>{

	@Override
	protected void reduce(Flow flow, Iterable<Text> values, Context context)
			throws IOException, InterruptedException {
		
		String phone = values.iterator().next().toString();
		// 输出结果
		context.write(new Text(phone), flow);
	}
}

4.启动类

public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration(true);
		conf.set("mapreduce.framework.name", "local");
		// 输出到HDFS文件系统中
		// conf.set("fs.defaultFS", "hdfs://hadoop-node01:9000");
		// 输出到本地文件系统
		conf.set("fs.defaultFS", "file:///");
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(FlowTest.class);
		
		// 指定本job要使用的map/reduce的工具类
		job.setMapperClass(FlowCountMap.class);
		job.setReducerClass(FlowCountReducer.class);
		
		// 指定mapper输出kv的类型
		job.setMapOutputKeyClass(Flow.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Flow.class);
		
		// 指定job的原始文件输入目录
		// 6.设置输出输出类
		FileInputFormat.setInputPaths(job, new Path("c:/tools/bigdata/mr/sort/input/"));
		FileOutputFormat.setOutputPath(job, new Path("c:/tools/bigdata/mr/sort/output/"));
				
		//将job中配置的相关参数,以及job所用的jar包提交给yarn运行
		//job.submit();  waitForCompletion等待执行完成
		boolean flag = job.waitForCompletion(true);
		System.exit(flag?0:1);
	}
}

5.输出结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

成功倒序输出 本案例的目的有两个:

  1. 实现对输出结果排序我们可以在自定义对象的compareTo方法中指定
  2. 如果一次MapReduce任务获取不到我们需要的结果我们可以对输出的结果做多次MapReduce任务。
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年04月04日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 实现思路
  • 实现步骤
    • 1.自定义Bean
      • 2.Map阶段
        • 3.Reduce阶段
          • 4.启动类
            • 5.输出结果
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档