在看教程前,要先注意hadoop有新旧两版的api:
而百度上大部分教程都是用的hadoop0.x版本的api,容易误导新人,所以在看参考资料时要留意版本,学习合适的部分
首先,在wordcount
中,默认的InputFormat
是TextInputFormat
,那么,TextInputFormat
是如何把一个文本分为多行,再交给每个Mapper
的呢?
如果一个行被切分到两个split里(这几乎是一定会发生的情况),
TextInputFormat
是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如WordCount就是一个例子).
首先,本文会提到的类有如下这些:
(以下的类都在org.apache.hadoop.mapreduce
包内,别看到里去了)
(A->B代表A extends B)
org.apache.hadoop.mapreduce
包内 org.apache.hadoop.mapred
包内 工作过程是如下几个步骤:
在Job提交前,客户端就会调用FileInputFormat
的public List<InputSplit> getSplits(JobContext job)
将一个文本按尽可能相等的字节长度,切割为一个个FileSplit
。
详情见下文博客
https://blog.csdn.net/ltliyue/article/details/51292312?utm_source=blogxgwz9
Mapper
JvmTask
的public void readFields(DataInput in)
内的语句t = new MapTask();
创建MapTask
对象MapTask
的public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
内的语句中: if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
由于是新版本API,会调用runNewMapper
3.MapTask
的void runNewMapper(...)
中(请先阅读此方法的源代码,以便理解下文),就会创建各种Mapper
要用到的参数,包括Mapper、InputFormat、InputSplit、RecordReader、MapContext
,之后会运行:
input.initialize(split, mapperContext); // input类型是RecordReader
mapper.run(mapperContext);
在InputFormat
默认为TextInputFormat
的情况下,input
的实际类型是LineRecordReader
,所以会调用相应的函数实现。
在这两句中,Mapper
会初始化,并且准备运行。
Mapper
会被分配到一个Split
,在runNewMapper
中可以包装成RecordReader
。
看MapTask::runNewMapper
中的语句: org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext);
而NewTrackingRecordReader是MapTask的内部类,其构造函数为: NewTrackingRecordReader(...) throws InterruptedException, IOException { this.reporter = reporter; this.inputRecordCounter = reporter .getCounter(TaskCounter.MAP_INPUT_RECORDS); this.fileInputByteCounter = reporter .getCounter(FileInputFormatCounter.BYTES_READ); ... this.real = inputFormat.createRecordReader(split, taskContext); // 重要 ... }
在注释了"重要"的那行调用了InputFormat
(此处为TextInputFormat
)的
createRecordReader
方法,将一个FileSplit
包装为一个
LineRecordReader
。MapTask::runNewMapper
函数里 org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext);
RecordReader
会被包装进MapContextImpl
实例,然后被作为拷贝模板传递给WrappedMapper.Context
实例中(org.apache.hadoop.mapreduce.lib.map.WrappedMapper
)。之后,runNewMapper
函数会调用mapper.run(mapperContext);
开始运行。
Mapper::run(Context context)
的代码如下:
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}
需要注意:
该函数会循环调用 context.nextKeyValue()
来获取key-value对。
context.nextKeyValue
调用的是WrappedMapper::nextKeyValue
,后者返回return mapContext.nextKeyValue();
。所以实际调用的是MapContextImpl::nextKeyValue()
,而它又返回return reader.nextKeyValue();
。所以最终调用的是RecordReader::nextKeyValue()
。nextKeyValue
的函数实现详见WrappedMapper
,MapContextImpl
和LineRecordReader
。所以Mapper
的运行过程中,循环调用的是LineRecordReader
的nextKeyValue
函数。我们看到LineRecordReader::initialize
和LineRecordReader::nextKeyValue
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
...
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
...
public boolean nextKeyValue() throws IOException {
...
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
}
...
}
看到两个函数调用的in.readLine
我们知道,它用了巧妙的办法对付两个Split分割一个句子的情况。
initialize
里读取第一行,再在nextKeyValue
里一直读取,直到结束位置在Split的边界之后。nextKeyValue
里一直读取,直到结束位置在Split的边界之后。总结来说,对于每个Split,都会在最后多读一行,相应的,开头就略去一行。而第一个Split不需要略去开头(顶多略去utf-8
的标记)
所以,总的来说:
就像下图所示:
正所谓"上有政策,下有对策"啊
通过大量调用抽象类的方法(而不是具体类)可以将软件架构解耦,提高各个模块的独立性,易于修改和替换。 但缺点是不易于学习者用"顺藤摸瓜"的方式学习整个系统。需要大量地查阅网络资料。