前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MapReduce之自定义InputFormat

MapReduce之自定义InputFormat

作者头像
孙晨c
发布2020-07-21 10:15:55
6310
发布2020-07-21 10:15:55
举报
文章被收录于专栏:无题~无题~

在企业开发中,Hadoop框架自带的InputFormat类型不能满足所有应用场景,需要自定义InputFormat来解决实际问题。 自定义InputFormat步骤如下:

  • (1)自定义一个类继承FilelnputFormat
  • (2)自定义一个类继承RecordReader,实现一次读取一个完整文件,将文件名为key,文件内容为value。
  • (3)在输出时使用SequenceFileOutPutFormat输出合并文件。

无论HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。

1. 需求

将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value(bytes) 对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

(1)输入数据

在这里插入图片描述
在这里插入图片描述

(2)期望输出文件格式

在这里插入图片描述
在这里插入图片描述

2. 需求分析

  1. 自定义一个类继承FileInputFormat (1)重写isSplitable()方法,返回false,让文件不可切,整个文件作为1片。 (2)重写createRecordReader(),返回自定义的RecordReader对象
  2. 自定义一个类继承RecordReader 在RecordReader中,nextKeyValue()是最重要的方法,返回当前读取到的key-value,如果读到返回true,调用Mapper的map()来处理,否则返回false

3. 编写程序

MyInputFormat.java

代码语言:javascript
复制
/*
 * 1. 改变切片策略,一个文件固定切1片,通过指定文件不可切
 * 
 * 2. 提供RR ,这个RR读取切片的文件名作为key,读取切片的内容封装到bytes作为value
 */
public class MyInputFormat extends FileInputFormat {

	@Override
	public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
		return new MyRecordReader();
	}
	
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		return false;
	}
}

MyRecordReader.java

代码语言:javascript
复制
/*
 * RecordReader从MapTask处理的当前切片中读取数据
 * 
 * XXXContext都是Job的上下文,通过XXXContext可以获取Job的配置Configuration对象
 */
public class MyRecordReader extends RecordReader {
	
	private Text key;
	private BytesWritable value;
	
	private String filename;
	private int length;
	
	private FileSystem fs;
	private Path path;
	
	private FSDataInputStream is;
	
	private boolean flag=true;

	// MyRecordReader在创建后,在进入Mapper的run()之前,自动调用
	// 文件的所有内容设置为1个切片,切片的长度等于文件的长度
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

		FileSplit fileSplit=(FileSplit) split;
		
		filename=fileSplit.getPath().getName();
		
		length=(int) fileSplit.getLength();
		
		path=fileSplit.getPath();
		
		//获取当前Job的配置对象
		Configuration conf = context.getConfiguration();
		
		//获取当前Job使用的文件系统
		fs=FileSystem.get(conf);
		
		 is = fs.open(path);
		
	}

	// 读取一组输入的key-value,读到返回true,否则返回false
	// 将文件的名称封装为key,将文件的内容封装为BytesWritable类型的value,返回true
	// 第二次调用nextKeyValue()返回false
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		
		if (flag) {
			
			//实例化对象
			if (key==null) {
				key=new Text();
			}
			
			if (value==null) {
				value=new BytesWritable();
			}
			
			//赋值
			//将文件名封装到key中
			key.set(filename);
			
			// 将文件的内容读取到BytesWritable中
			byte [] content=new byte[length];
			
			IOUtils.readFully(is, content, 0, length);
			
			value.set(content, 0, length);
			
			flag=false;
			
			return true;
			
		}
		return false;
	}

	//返回当前读取到的key-value中的key
	@Override
	public Object getCurrentKey() throws IOException, InterruptedException {
		return key;
	}

	//返回当前读取到的key-value中的value
	@Override
	public Object getCurrentValue() throws IOException, InterruptedException {
		return value;
	}

	//返回读取切片的进度
	@Override
	public float getProgress() throws IOException, InterruptedException {
		return 0;
	}

	// 在Mapper的输入关闭时调用,清理工作
	@Override
	public void close() throws IOException {
		if (is != null) {
			IOUtils.closeStream(is);
		}	
		if (fs !=null) {
			fs.close();
		}
	}
}

CustomIFMapper.java

代码语言:javascript
复制
public class CustomIFMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{

}

CustomIFReducer.java

代码语言:javascript
复制
public class CustomIFReducer extends Reducer<Text, BytesWritable, Text, BytesWritable>{

}

CustomIFDriver.java

代码语言:javascript
复制
public class CustomIFDriver {
	
	public static void main(String[] args) throws Exception {
		
		Path inputPath=new Path("e:/mrinput/custom");
		Path outputPath=new Path("e:/mroutput/custom");
		
		//作为整个Job的配置
		Configuration conf = new Configuration();
		//保证输出目录不存在
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		
		// 创建Job
		Job job = Job.getInstance(conf);

		// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
		job.setMapperClass(CustomIFMapper.class);
		job.setReducerClass(CustomIFReducer.class);
		
		// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
		// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(BytesWritable.class);
		
		// 设置输入目录和输出目录
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// 设置输入和输出格式
		job.setInputFormatClass(MyInputFormat.class);
		job.setOutputFormatClass(SequenceFileOutputFormat.class);
		
		// ③运行Job
		job.waitForCompletion(true);
			
	}
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-07-19 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 需求
  • 2. 需求分析
  • 3. 编写程序
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档