前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

作者头像
汤高
发布2018-01-11 16:49:03
1.8K0
发布2018-01-11 16:49:03
举报
文章被收录于专栏:积累沉淀积累沉淀

正文开始前 ,先介绍几个概念

序列化

所谓序列化,是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储。

反序列化

是指将字节流转回到结构化对象的逆过程

序列化在分布式数据处理的两个大领域经常出现:进程间通信和永久存储

在Hadoop中,系统中多个节点上进程间的通信是通过"远程过程调用"(remote procedure call,RPC)实现的 。RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息

Hadoop使用了自己写的序列化格式Writable,它格式紧凑,速度快,但是它很难用Java以外的语言进行拓展或使用,因为Writable是Hadoop的核心,大多数MapReduce程序都会为键和值使用它

简单来说,RPC协议是让程序员可以调用远程计算机进程上的代码的一套工具

打个比方  就是A通过网络调用B的某个进程方法

通信中的协议是由程序员自己规定的,比如你可以规定说当A向B发送数字1, B就打印hello hadoop, 并返回数字1给A, 如果发送数字2,B就打印hello world并发送数字2给A.  

序列化------------>写  write(DataOutput out)

反序列化-------->读   readFields(DataInput in)

首先声明:我是基于Hadoop2.6.4版本

 一.  Hadoop内置的数据类型

BooleanWritable:标准布尔型数值

ByteWritable:单字节数值

DoubleWritable:双字节数值

FloatWritable:浮点数

IntWritable:整型数

LongWritable:长整型数

Text:使用UTF8格式存储的文本

NullWritable:当<key, value>中的key或value为空时使用

 Hadoop中的数据类型都要实现Writable接口,以便用这些类型定义的数据可以被网络传输和文件存储。

 二. 用户自定义数据类型的实现

     1.继承接口Writable,实现其方法write()和readFields(), 以便该数据能被序列化后完成网络传输或文件输入/输出;

     2.如果该数据需要作为主键key使用,或需要比较数值大小时,则需要实现WritalbeComparable接口,实现其方法write(),readFields(),CompareTo() 。

     3.数据类型,必须要有一个无参的构造方法,为了方便反射,进行创建对象。    

     4.在自定义数据类型中,建议使用java的原生数据类型,最好不要使用Hadoop对原生类型进行封装的数据类型。比如 int x ;//IntWritable 和String s; //Text  等等

下面是一个自定义的数据类型  3D坐标轴 

代码语言:javascript
复制
package com.tg.type;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Point3D implements WritableComparable<Point3D> {
	public float x, y, z;
	public Point3D(float fx, float fy, float fz) {
		this.x = fx;
		this.y = fy;
		this.z = fz;
	}
	public Point3D() {
		this(0.0f, 0.0f, 0.0f);
	}
	public void readFields(DataInput in) throws IOException {
		x = in.readFloat();
		y = in.readFloat();
		z = in.readFloat();
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeFloat(x);
		out.writeFloat(y);
		out.writeFloat(z);
	}
	public String toString() {
		return "X:"+Float.toString(x) + ", "
				+ "Y:"+Float.toString(y) + ", "
				+ "Z:"+Float.toString(z);
	}
	public float distanceFromOrigin() {
		return (float) Math.sqrt( x*x + y*y +z*z);
	}
	public int compareTo(Point3D other) {
		return Float.compare(
				distanceFromOrigin(), 
				 other.distanceFromOrigin());
	}
	public boolean equals(Object o) {
		if( !(o instanceof Point3D)) {
			return false;
		}
		Point3D other = (Point3D) o;
		return this.x == other.x && this.y == other.y && this.z == other.z;
	}
	/* 实现 hashCode() 方法很重要
	 * Hadoop的Partitioners会用到这个方法,后面再说 
	 */
	public int hashCode() {
		return Float.floatToIntBits(x)
				^ Float.floatToIntBits(y)
				^ Float.floatToIntBits(z);
	}
	
}

下面讲数据输入输出格式和自定义数据输入输出格式 ,然后把上面讲过的自定义数据类型整合进去

首先看看输入文件a.txt

数据输入格式(InputFormat) 用于描述MapReduce作业的数据输入规范。MapReduce框架依靠数据输入格式完成输入规范检查(比如输入文件目录的检查)、对数据文件进行输入分块(也叫分片,InputSplit),以及提供从输入分块(分片)中将数据记录逐一读出,并转化为Map过程的输入键值对等功能

Hadoop提供了丰富的内置数据输入格式。最常用的数据输入格式包括:TextInputFormat和KeyValueInputFormat

TextInputFormat是系统默认的数据输入格式,可以将文本文件分块并逐行读入以便Map节点进行处理。读入一行时,所产生的主键Key就是当前行在整个文本文件中的字节偏移位置,而value就是该行的内容,它是系统默认的输入格式,当用户程序不设置任何数据输入格式时,系统自动使用这个数据输入格式。

比如如下文件内容

hello tanggao

hello hadoop

第一行的偏移量为0

第二行偏移量为13

KeyValueTextInputFormat是另一个常用的数据输入格式,可将一个按照<key,value>格式逐行存放的文本文件逐行读出,并自动解析生成相应的key和value

比如

姓名    汤高

年龄    20

则解析出来的

第一行键Key为姓名  值value为汤高

第二行键key为年龄 值value为20

注意和TextInputFormat不同,TextInputFormat是偏移量做键,整行内容做值

对于一个数据输入格式,都需要一个对应的RecordReader。RecordReader。主要用于将一个文件中的数据记录分拆成具体的键值对,传送给Map过程作为键值对输入参数。每一个数据输入格式都有一个默认的RecordReader。TextInputFormat的默认RecordReader是LineRecordReader,而KeyValueTextInputFormat的默认RecordReader是KeyValueLineRecordReader

当然肯定还有很多数据输入格式和对应的默认RecordReader 这里就不接受了,有需要的可以去官网看看

数据输出格式(OutputFormat)用于描述MapReduce作业的数据输出规范。MapReduce框架依靠数据输出格式完成输出规范检查(蔽日检查输出目录是否存在),以及提供作业结果数据输出等功能

Hadoop提供了丰富的内置数据输出格式。最常用的数据输出格式是TextOutputFormat,也是系统默认的数据输出格式,可以将计算结果以 key+\t+value的形式逐行输出到文本文件中。

与数据输入格式中的RecordReader类似,数据输出格式也提供一个对应的RecordWriter,以便系统明确输出结果写入到文件中的具体格式。

TextOutputFormat的默认RecordWriter是LineRecordWriter,其实际操作是将结果数据以key+\t+value的形式输出到文本文件中。

当然同样肯定还有很多数据输出格式和对应的默认RecordWriter

对于自定义数据输入格式 可以参考已有的数据输入格式,继承自它即可,只要重写GetRecordReader方法得到一个自己写的RecordReader即可

我的是仿造KeyValueTextInputFormat和它的KeyValueLineRecordReader来自定义自己的输入格式的,所以我都是自己复制了上面两个类的源码然后进行自己的改写

代码语言:javascript
复制
package com.my.input;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


public class myInputFormat extends FileInputFormat<Text,Text> {
	//用来压缩的
	@Override
	  protected boolean isSplitable(JobContext context, Path file) {
	    final CompressionCodec codec =
	      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
	    if (null == codec) {
	      return true;
	    }
	    return codec instanceof SplittableCompressionCodec;
	  }
	
	@Override
	public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException, InterruptedException {
		context.setStatus(genericSplit.toString());
		return new MyRecordReader(context.getConfiguration());
	}

}

他的RecordReader

代码语言:javascript
复制
package com.my.input;

import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MyRecordReader extends RecordReader<Text, Text> {

	 public static final String KEY_VALUE_SEPERATOR = 
			    "mapreduce.input.mylinerecordreader.key.value.separator";
			  
			  private final LineRecordReader lineRecordReader;
			  //源码是根据\t分割  我改为了我自己的需求为=号分割
			  private byte separator = (byte) '=';

			  private Text innerValue;

			  private Text key;
			  
			  private Text value;
			  
			  public Class<Text> getKeyClass() { return Text.class; }
			  
			  public MyRecordReader(Configuration conf)
			    throws IOException {
			    
			    lineRecordReader = new LineRecordReader();
			    String sepStr = conf.get(KEY_VALUE_SEPERATOR, "=");
			    this.separator = (byte) sepStr.charAt(0);
			  }

			  public void initialize(InputSplit genericSplit,
			      TaskAttemptContext context) throws IOException {
			    lineRecordReader.initialize(genericSplit, context);
			   
				
			  }
			  
			  public static int findSeparator(byte[] utf, int start, int length, 
			      byte sep) {
			    for (int i = start; i < (start + length); i++) {
			      if (utf[i] == sep) {
			        return i;
			      }
			    }
			    return -1;
			  }

			  public static void setKeyValue(Text key, Text value, byte[] line,
			      int lineLen, int pos) {
			    if (pos == -1) {
			      key.set(line, 0, lineLen);
			      value.set("");
			    } else {
			      key.set(line, 0, pos);
			      value.set(line, pos + 1, lineLen - pos - 1);
			    }
			  }
			  /** Read key/value pair in a line. */
			  public synchronized boolean nextKeyValue()
			    throws IOException {
			    byte[] line = null;
			    int lineLen = -1;
			    if (lineRecordReader.nextKeyValue()) {
			      innerValue = lineRecordReader.getCurrentValue();
			      line = innerValue.getBytes();
			      lineLen = innerValue.getLength();
			    } else {
			      return false;
			    }
			    if (line == null)
			      return false;
			    if (key == null) {
			      key = new Text();
			    }
			    if (value == null) {
			      value = new Text();
			    }
			    int pos = findSeparator(line, 0, lineLen, this.separator);
			    setKeyValue(key, value, line, lineLen, pos);
			    return true;
			  }
			  
			  public Text getCurrentKey() {
			    return key;
			  }

			  public Text getCurrentValue() {
			    return value;
			  }

			  public float getProgress() throws IOException {
			    return lineRecordReader.getProgress();
			  }
			  
			  public synchronized void close() throws IOException { 
			    lineRecordReader.close();
			  }
}

对于自定义数据输出格式 可以参考已有的数据输出格式,继承自它即可,只要重写GetRecordWriter方法得到一个自己写的RecordWriter即可

一种实现:我们一般继承自FileOutputFormat来改写

因为FileOutputFormat已经帮我们实现了许多通用的功能

代码语言:javascript
复制
package com.my.input;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class MyOutputFormat<K, V> extends FileOutputFormat<K, V> {
	public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";

	protected static class MyLineRecordWriter<K, V> extends RecordWriter<K, V> {
		private static final String utf8 = "UTF-8";
		private static final byte[] newline;

		static {
			try {
				newline = "\n".getBytes(utf8);
			} catch (UnsupportedEncodingException uee) {
				throw new IllegalArgumentException("can't find " + utf8 + " encoding");
			}
		}

		protected DataOutputStream out;
		private final byte[] keyValueSeparator;

		public MyLineRecordWriter(DataOutputStream out, String keyValueSeparator) {
			this.out = out;
			try {
				this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
			} catch (UnsupportedEncodingException uee) {
				throw new IllegalArgumentException("can't find " + utf8 + " encoding");
			}
		}
		//改写了源码  把\t改为了=========>
		public MyLineRecordWriter(DataOutputStream out) {
			this(out, "=========>");
		}

		/**
		 * Write the object to the byte stream, handling Text as a special case.
		 * 
		 * @param o
		 *            the object to print
		 * @throws IOException
		 *             if the write throws, we pass it on
		 */
		private void writeObject(Object o) throws IOException {
			if (o instanceof Text) {
				Text to = (Text) o;
				out.write(to.getBytes(), 0, to.getLength());
			} else {
				out.write(o.toString().getBytes(utf8));
			}
		}

		public synchronized void write(K key, V value) throws IOException {

			boolean nullKey = key == null || key instanceof NullWritable;
			boolean nullValue = value == null || value instanceof NullWritable;
			if (nullKey && nullValue) {
				return;
			}
			if (!nullKey) {
				writeObject(key);
			}
			if (!(nullKey || nullValue)) {
				out.write(keyValueSeparator);
			}
			if (!nullValue) {
				writeObject(value);
			}
			out.write(newline);
		}

		public synchronized void close(TaskAttemptContext context) throws IOException {
			out.close();
		}
	}

	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
		Configuration conf = job.getConfiguration();
		boolean isCompressed = getCompressOutput(job);
		String keyValueSeparator = conf.get(SEPERATOR, "=========>");
		CompressionCodec codec = null;
		String extension = "";
		if (isCompressed) {
			Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
			extension = codec.getDefaultExtension();
		}
		Path file = getDefaultWorkFile(job, extension);
		FileSystem fs = file.getFileSystem(conf);
		if (!isCompressed) {
			FSDataOutputStream fileOut = fs.create(file, false);
			return new MyLineRecordWriter<K, V>(fileOut, keyValueSeparator);
		} else {
			FSDataOutputStream fileOut = fs.create(file, false);
			return new MyLineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),
					keyValueSeparator);
		}
	}
}

第二种实现:FileOutputFormat的一个重要子类就是TextOutputFormat,我们也可以继承它,然后重写getRecordWriter方法即可

代码语言:javascript
复制
package com.my.input;
import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class MyOutputFormat2<K, V> extends TextOutputFormat<K, V> {
	
	@Override
	public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
		Configuration conf = job.getConfiguration();
		boolean isCompressed = getCompressOutput(job);
		//改写了源码  把\t改为了=========>
		String keyValueSeparator = conf.get(SEPERATOR, "=========>");
		CompressionCodec codec = null;
		String extension = "";
		if (isCompressed) {
			Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
			codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
			extension = codec.getDefaultExtension();
		}
		Path file = getDefaultWorkFile(job, extension);
		FileSystem fs = file.getFileSystem(conf);
		if (!isCompressed) {
			FSDataOutputStream fileOut = fs.create(file, false);
			return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
		} else {
			FSDataOutputStream fileOut = fs.create(file, false);
			return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),
					keyValueSeparator);
		}
	}
}

最后就可以用来测试了

测试代码

代码语言:javascript
复制
package com.my.input;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import com.tg.type.Point3D;




public class Point3DDriver {

	/**
	 * 
	 * @author 汤高 
	 * Point3D为自定义数据类型  把它作为map的输出类型
	 *        
	 */
	// Map过程
	static int count=0;
	public static class MyMapper extends Mapper<Text, Text, Text, Point3D> {
		/***
		 * 
		 */
		@Override
		protected void map(Text key, Text value, Mapper<Text, Text, Text, Point3D>.Context context)
				throws IOException, InterruptedException {
			count++;
			//这里得到的键是自定义输入格式输出的内容  本例是 One 、two、three
			//这里得到的值是X:1.0, Y:2.0, Z:3.0 等
			//根据都好截取值里面的内容 分别设置到自定义数据类型Point3D里面去
			String[] vs = value.toString().split(",");
			Point3D p = new Point3D(Float.parseFloat(vs[0].split(":")[1]), Float.parseFloat(vs[1].split(":")[1]), Float.parseFloat(vs[2].split(":")[1])	);
				// 写出去 把自定义数据类型输出去
			
				context.write(new Text(key), p);
				System.out.println("几个map==========>"+count);
		}
	}
	//Reduce过程
	public static class MyReducer extends Reducer<Text, Point3D, Text, Point3D>{
		
		protected void reduce(Text key, Point3D value,
				Reducer<Text, Point3D, Text, Point3D>.Context context) throws IOException, InterruptedException {
			
			context.write(key, value);
		}
		
	}

	public static void main(String[] args) {

		try {
			Configuration conf = new Configuration();
			String[] paths = new GenericOptionsParser(conf, args).getRemainingArgs();
			if (paths.length < 2) {
				throw new RuntimeException("usage <input> <output>");
			}

			Job job = Job.getInstance(conf, "Point3DDriver");
			job.setJarByClass(Point3DDriver.class);

			job.setMapperClass(MyMapper.class);
			job.setReducerClass(MyReducer.class);
			job.setInputFormatClass(myInputFormat.class);
			
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(Point3D.class);
			job.setOutputFormatClass(MyOutputFormat.class);
			//job.setOutputFormatClass(MyOutputFormat2.class);
			FileInputFormat.addInputPaths(job, paths[0]);
			FileOutputFormat.setOutputPath(job, new Path(paths[1] + System.currentTimeMillis()));// 整合好结果后输出的位置
			System.exit(job.waitForCompletion(true) ? 0 : 1);// 执行job
			

		} catch (IOException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

对于编写Map函数和Reduce函数不熟悉的朋友,可以参看我上篇博客 里面讲解了如何实现MapReduce编程  

MapReduce工作原理详解   

结果:

码字不易,转载请指明出自  https://cloud.tencent.com/developer/article/1018619

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-05-03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档