原文地址:http://blog.csdn.net/chengyuqiang/article/details/73410106
从Hadoop0.20开始Hadoop提供了两套MapReduce API,新的API在旧API基础上进行封装,在扩展性和易用性等方面有显著提高。旧API已经被废弃,不再介绍,新API在org.apache.hadoop.mapreduce包中,下面将对该包下的重要类和接口进行介绍。
InputFormat抽象类仅有两个抽象方法:
List<InputSplit> getSplits()
, 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。RecordReader<K,V> createRecordReader()
,创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题InputFormat的常用子类
1)FileInputFormat<K,V>
这个是基本的父类,自定义就直接使用它作为父类。
2)TextInputFormat<LongWritable,Text>
这个是默认的数据格式类。key代表当前行数据距离文件开始的距离,value代码当前行字符串。
3)SequenceFileInputFormat<K,V>
这个是序列文件输入格式,使用序列文件可以提高效率,但是不利于查看结果,建议在过程中使用序列文件,最后展示可以使用可视化输出。
4)KeyValueTextInputFormat<Text,Text>
这个是读取以Tab(也即是\t)分隔的数据,每行数据如果以\t分隔,那么使用这个读入,就可以自动把\t前面的当做key,后面的当做value。
5)CombineFileInputFormat<K,V>
合并大量小数据是使用。
6)MultipleInputs
,多种输入,可以为每个输入指定逻辑处理的Mapper。
序列化是指将对象转化为字符流以便在网络上传输或者写入磁盘持久化保存,而反序列化是指将字符流转化为对象的过程。 Hadoop主要在两个方面使用序列化技术:RPC和数据持久化。 hadoop在节点间的内部通讯使用的是RPC,RPC协议把消息翻译成二进制字节流发送到远程节点,远程节点再通过反序列化把二进制流转成原始的信息。RPC的序列化需要实现以下几点: 1.压缩,可以起到压缩的效果,占用的宽带资源要小。 2.快速,内部进程为分布式系统构建了高速链路,因此在序列化和反序列化间必须是快速的,不能让传输速度成为瓶颈。 3.可扩展的,新的服务端为新的客户端增加了一个参数,老客户端照样可以使用。 4.兼容性好,可以支持多个语言的客户端。
Hadoop的Writable接口提供了序列化标准。Writable接口有如下两个方法:
Hadoop提供了多钟Writable实现类,存放于org.apache.hadoop.io包中,这些类的层次结构如下图所示。
其中,WritableComparable接口继承了Writable接口和Comparable接口。
public interface WritableComparable<T>
extends Writable, Comparable<T>
请注意:Java类支持单继承,而Java接口可以多继承。不允许类多重继承的主要原因是,如果A同时继承B和C,而B和C同时有一个D方法,A如何决定该继承那一个呢?但接口不存在这样的问题,接口全都是抽象方法继承谁都无所谓,所以接口可以继承多个接口。
下表列出了Java基本类型以及Writable类对应关系。
Java基本类型 | Writable | 序列化后字节数 |
---|---|---|
boolean | BooleanWritable | 1 |
byte | ByteWritable | 1 |
short | ShortWritable | 2 |
int | IntWritable | 4 |
int | VIntWritable | 1–5 |
float | FloatWritable | 4 |
long | LongWritable | 8 |
long | VLongWritable | 1–9 |
double | DoubleWritable | 8 |
不同的Writable类序列化后占用的字数长度是不一样的,需要综合考虑应用中数据特征选择合适的类型。对于整数类型有两种Writable类型可以选择,一种是定长Writable类型IntWritable和LongWritable;另一种是变长Writable类型VIntWritable和VLongWritable。定长类型顾名思义使用固定长度的字节数表示,比如一个IntWritable类型使用4个长度的字节表示一个int;变长类型则根据数值的大小使用相应的字节长度表示,当数值在-112~127之间时使用1个字节表示,在-112~127范围之外的数值使用头一个字节表示该数值的正负符号以及字节长度(zero-compressed encoded integer)。定长的Writable类型适合数值均匀分布的情形,而变长的Writable类型适合数值分布不均匀的情形,一般情况下变长的Writable类型更节省空间,因为大多数情况下数值是不均匀的,对于整数类型的Writable选择,除非对数据的均匀分布很有把握,否则使用变长Writable类型,除非数据的取值区间确定在int范围之内,否则为了程序的可扩展性,请选择VLongWritable类型。
对于这些Writable类型,可以通过get方法获取被封装的原来Java基本类型的值,或者通过set方法将Java基本数据类型数据转换成Writable类型,除Text外。Text支持通过set方法将字符串转换为Text类型,通过toString方法获取原字符串内容。
在前面MapReduce程序中已经介绍了创建Mapper类和Reduce类,这里再对这两个重要的类补充说明一下。 Mapper类位于org.apache.hadoop.mapreduce包下,一般情况下我们只需继承该Mapper类重写map方法即可实现map操作。除了map方法,Mapper类还有setup、cleanup和run方法。
方法 | 说明 |
---|---|
setup | 进行map前的准备工作 |
map | 承担主要的键值对处理工作 |
cleanup | 收尾工作,如关闭文件或键值对分发 |
run | 提供了setup–>map–>cleanup的执行模版 |
Hadoop自带了一些Mapper类的实现,比如InverseMapper类和TokenCounterMapper类。InverseMapper类用于调换键值对的顺序再原样输出,TokenCounterMapper类的作用和WordCount中的Mapper类作用相同,同于单词计数。
与Mapper类类似,Reducer类位于位于org.apache.hadoop.mapreduce包下,一般情况下用户只需继承该类根据需求重写reduce方法即可。同样,Reducer类也有setup、reduce、cleanup和run四个方法。setup方法用于reduce前的准备工作,reduce方法承担对键值对的处理工作,cleanup负责收尾工作,run方法提供了setup–>map–>cleanup的执行模版。
Partitioner是个抽象类,该类只有一个方法getPartition用于返回分区号。
public abstract class Partitioner<KEY, VALUE> {
/**
* Get the partition number for a given key (hence record) given the total
* number of partitions i.e. number of reduce-tasks for the job.
*
* <p>Typically a hash function on a all or a subset of the key.</p>
*
* @param key the key to be partioned.
* @param value the entry value.
* @param numPartitions the total number of partitions.
* @return the partition number for the <code>key</code>.
*/
public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}
Partitioner的实现类HashPartitioner源码
package org.apache.hadoop.mapreduce.lib.partition;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Partitioner;
/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}