首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用MapReduce运行自定义bean案例

使用MapReduce运行自定义bean案例

作者头像
孙晨c
发布2020-07-16 17:45:33
5110
发布2020-07-16 17:45:33
举报
文章被收录于专栏:无题~无题~

如果一个文件的内容不只是简单的单词,而是类似于一个对象那般,有多种属性值,如:

在这里插入图片描述
在这里插入图片描述

在这个文件中,每一行的内容分别代表:手机号、IP、访问网站、上行流量、下行流量、状态码,现在需要统计每个手机号访问网站的上行流量、下行流量以及它们的总和。由于mapper按照每行进行切片,不妨创建一个bean,封装这些属性。

FlowBean.java

public class FlowBean implements Writable{
	
	private long upFlow;//上行流量
	private long downFlow;//下行流量
	private long sumFlow;//流量总和
	
	public FlowBean() {
		
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}

	// 序列化   在写出属性时,如果为引用数据类型,属性不能为null
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
	}

	//反序列化   序列化和反序列化的顺序要一致
	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow=in.readLong();
		downFlow=in.readLong();
		sumFlow=in.readLong();
	}

	@Override
	public String toString() {
		return  upFlow + "\t" + downFlow + "\t" + sumFlow;
	}

}

FlowBeanMapper.java

/*
 * 1. 统计手机号(String)的上行(long,int),下行(long,int),总流量(long,int)
 * 
 * 手机号为key,Bean{上行(long,int),下行(long,int),总流量(long,int)}为value
 */
public class FlowBeanMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
	
	private Text out_key=new Text();
	private FlowBean out_value=new FlowBean();
	
	// (0,1	13736230513	192.196.100.1	www.baidu.com	2481	24681	200)
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
			throws IOException, InterruptedException {
		
		String[] words = value.toString().split("\t");
		
		//封装手机号
		out_key.set(words[1]);
		// 封装上行
		out_value.setUpFlow(Long.parseLong(words[words.length-3]));
		// 封装下行
		out_value.setDownFlow(Long.parseLong(words[words.length-2]));
		//写出
		context.write(out_key, out_value);
	
	}

}

FlowBeanReducer.java

public class FlowBeanReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
	
	private FlowBean out_value=new FlowBean();
	
	@Override
	protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
			throws IOException, InterruptedException {
		
		long sumUpFlow=0;
		long sumDownFlow=0;
		
		for (FlowBean flowBean : values) {
			
			sumUpFlow+=flowBean.getUpFlow();
			sumDownFlow+=flowBean.getDownFlow();
			
		}
		
		out_value.setUpFlow(sumUpFlow);
		out_value.setDownFlow(sumDownFlow);
		out_value.setSumFlow(sumDownFlow+sumUpFlow);
		
		context.write(key, out_value);
		
	}

}

FlowBeanDriver.java

public class FlowBeanDriver {
	
	public static void main(String[] args) throws Exception {
		
		Path inputPath=new Path("e:/mrinput/flowbean");
		Path outputPath=new Path("e:/mroutput/flowbean");
		
		//作为整个Job的配置
		Configuration conf = new Configuration();
		
		
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {		
			fs.delete(outputPath, true);//保证输出目录不存在
		}
		
		// ①创建Job
		Job job = Job.getInstance(conf);
		
		// ②设置Job
		// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
		job.setMapperClass(FlowBeanMapper.class);
		job.setReducerClass(FlowBeanReducer.class);
		
		// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
		// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		// 设置输入目录和输出目录
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// ③运行Job
		job.waitForCompletion(true);
	}

}

统计结果:

在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-07-15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档