分析,以需求一的输出数据作为排序的输入数据,自定义FlowBean,以FlowBean为map输 出的key,以手机号作为Map输出的value,因为MapReduce程序会对Map阶段输出的key 进行排序
Java 的 compareTo 方法说明:
例如: o1.compareTo(o2)
; 返回正数的话,当前对象(调用 compareTo 方法的对象 o1) 要排在比较对象(compareTo 传参对象 o2)后面,返回负数的话,放在前面
public class FlowBean implements WritableComparable {
private Integer upFlow;
private Integer downFlow;
private Integer upCountFlow;
private Integer downCountFlow;
public FlowBean() {
}
public FlowBean(Integer upFlow, Integer downFlow, Integer upCountFlow, Integer downCountFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.upCountFlow = upCountFlow;
this.downCountFlow = downCountFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(upCountFlow);
out.writeInt(downCountFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readInt();
downFlow = in.readInt();
upCountFlow = in.readInt();
downCountFlow = in.readInt();
}
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + upCountFlow + "\t" + downCountFlow;
}
@Override
public int compareTo(FlowBean o) {
return this.upCountFlow > o.upCountFlow ?-1:1;
}
}
public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FlowBean flowBean = new FlowBean();
String[] split = value.toString().split("\t");
//获取手机号,作为V2 .
String phoneNum = split[0];
//获取其他流量字段,封装flowBean,作为K2
flowBean.setUpFlow(Integer.parseInt(split[1]));
flowBean.setDownFlow(Integer.parseInt(split[2]));
flowBean.setUpCountFlow(Integer.parseInt(split[3]));
flowBean.setDownCountFlow(Integer.parseInt(split[4]));
//将K2和V2写入上下文中
context.write(flowBean, new Text(phoneNum));
}
}
public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
context.write(value, key);
}
}
}
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] strings) throws Exception {
//创建一个任务对象
Job job = Job.getInstance(super.getConf(), "mapreduce_flowcountsort");
//打包放在集群运行时,需要做一个配置
job.setJarByClass(JobMain.class);
//第一步:设置读取文件的类: K1 和V1
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("hdfs://node01:8020/out/flowcount_out"));
//第二步:设置Mapper类
job.setMapperClass(FlowCountSortMapper.class);
//设置Map阶段的输出类型: k2 和V2的类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//第三,四,五,六步采用默认方式(分区,排序,规约,分组)
// 第七步 :设置文的Reducer类
job.setReducerClass(FlowCountSortReducer.class);
//设置Reduce阶段的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//设置Reduce的个数
// 第八步:设置输出类
job.setOutputFormatClass(TextOutputFormat.class);
//设置输出的路径
TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/out/flowcountsort_out"));
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//启动一个任务
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}