前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >详解wordcount(TextInputFormat工作机制)

详解wordcount(TextInputFormat工作机制)

作者头像
平凡的学生族
发布2019-05-25 09:33:55
1K0
发布2019-05-25 09:33:55
举报
文章被收录于专栏:后端技术后端技术

阅前注意事项

在看教程前,要先注意hadoop有新旧两版的api:

  1. 新版对应1.x版本,org.apache.hadoop.mapreduce.*。主要内容涉及新版本的API接口以及一些新特性(比如MapReduce安全)
  2. 旧版对应0.x版本,org.apache.hadoop.mapred.*。这里面主要包含旧的API接口以及MapReduce各个服务(JobTracker以及TaskTracker)的实现。一些hadoop1和2框架公用的部分也在此。

而百度上大部分教程都是用的hadoop0.x版本的api,容易误导新人,所以在看参考资料时要留意版本,学习合适的部分

问题引子

首先,在wordcount中,默认的InputFormatTextInputFormat,那么,TextInputFormat是如何把一个文本分为多行,再交给每个Mapper的呢?

如果一个行被切分到两个split里(这几乎是一定会发生的情况),TextInputFormat是如何处理的?如果是生硬地把一行切割到两个split里,是对数据的一种破坏,可能会影响数据分析的正确性(比如WordCount就是一个例子).

解释

首先,本文会提到的类有如下这些: (以下的类都在org.apache.hadoop.mapreduce包内,别看到里去了)

(A->B代表A extends B)

  1. org.apache.hadoop.mapreduce包内
    1. FileSplit -> InputSplit
    2. TextInputFormat -> FileInputFormat -> InputFormat
    3. LineRecordReader -> RecordReader
    4. MapContextImpl -> MapContext,Mapper.Context -> MapContext
  2. org.apache.hadoop.mapred包内
    1. JvmTask
    2. MapTask

工作过程是如下几个步骤:

步骤0. 切割Split

在Job提交前,客户端就会调用FileInputFormatpublic List<InputSplit> getSplits(JobContext job)将一个文本按尽可能相等的字节长度,切割为一个个FileSplit。 详情见下文博客 https://blog.csdn.net/ltliyue/article/details/51292312?utm_source=blogxgwz9

步骤1. 建立Mapper

  1. JvmTaskpublic void readFields(DataInput in)内的语句t = new MapTask();创建MapTask对象
  2. MapTaskpublic void run(final JobConf job, final TaskUmbilicalProtocol umbilical)内的语句中:
代码语言:javascript
复制
  if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
      } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
}

由于是新版本API,会调用runNewMapper 3.MapTaskvoid runNewMapper(...)中(请先阅读此方法的源代码,以便理解下文),就会创建各种Mapper要用到的参数,包括Mapper、InputFormat、InputSplit、RecordReader、MapContext,之后会运行:

代码语言:javascript
复制
  input.initialize(split, mapperContext); // input类型是RecordReader
  mapper.run(mapperContext);

InputFormat默认为TextInputFormat的情况下,input的实际类型是LineRecordReader,所以会调用相应的函数实现。 在这两句中,Mapper会初始化,并且准备运行。

步骤2. Mapper初始化

  1. 每个Mapper会被分配到一个Split,在runNewMapper中可以包装成RecordReader。 看MapTask::runNewMapper中的语句: org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext); 而NewTrackingRecordReader是MapTask的内部类,其构造函数为: NewTrackingRecordReader(...) throws InterruptedException, IOException { this.reporter = reporter; this.inputRecordCounter = reporter .getCounter(TaskCounter.MAP_INPUT_RECORDS); this.fileInputByteCounter = reporter .getCounter(FileInputFormatCounter.BYTES_READ); ... this.real = inputFormat.createRecordReader(split, taskContext); // 重要 ... } 在注释了"重要"的那行调用了InputFormat(此处为TextInputFormat)的 createRecordReader方法,将一个FileSplit包装为一个 LineRecordReader
  2. 还是在MapTask::runNewMapper函数里 org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); RecordReader会被包装进MapContextImpl实例,然后被作为拷贝模板传递给WrappedMapper.Context实例中(org.apache.hadoop.mapreduce.lib.map.WrappedMapper)。

之后,runNewMapper函数会调用mapper.run(mapperContext);开始运行。

步骤3. Mapper运行

Mapper::run(Context context)的代码如下:

代码语言:javascript
复制
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }

需要注意: 该函数会循环调用 context.nextKeyValue()来获取key-value对。

  • context.nextKeyValue调用的是WrappedMapper::nextKeyValue,后者返回return mapContext.nextKeyValue();。所以实际调用的是MapContextImpl::nextKeyValue(),而它又返回return reader.nextKeyValue();所以最终调用的是RecordReader::nextKeyValue()nextKeyValue的函数实现详见WrappedMapperMapContextImplLineRecordReader

所以Mapper的运行过程中,循环调用的是LineRecordReadernextKeyValue函数。我们看到LineRecordReader::initializeLineRecordReader::nextKeyValue

代码语言:javascript
复制
public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    ...
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    ...
代码语言:javascript
复制
public boolean nextKeyValue() throws IOException {
    ...
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        newSize = skipUtfByteOrderMark();
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos += newSize;
      }

      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }
    }
    ...
  }

看到两个函数调用的in.readLine我们知道,它用了巧妙的办法对付两个Split分割一个句子的情况。

  1. 对于非第一个Split,它首先在initialize里读取第一行,再在nextKeyValue里一直读取,直到结束位置在Split的边界之后。
  2. 对于第一个Split,就只是在nextKeyValue里一直读取,直到结束位置在Split的边界之后。

总结来说,对于每个Split,都会在最后多读一行,相应的,开头就略去一行。而第一个Split不需要略去开头(顶多略去utf-8的标记)

所以,总的来说:

  • 从宏观上,一个文本会以字节为单位,被分为多个Split.
  • 从微观上,对于每个Split,都会通过略去开头一句话,多读结尾一句话的方法,避免句子被Split边界给切割开。

就像下图所示:

正所谓"上有政策,下有对策"

附记

通过大量调用抽象类的方法(而不是具体类)可以将软件架构解耦,提高各个模块的独立性,易于修改和替换。 但缺点是不易于学习者用"顺藤摸瓜"的方式学习整个系统。需要大量地查阅网络资料。

参考

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.11.02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 阅前注意事项
  • 问题引子
  • 解释
    • 步骤0. 切割Split
      • 步骤1. 建立Mapper
        • 步骤2. Mapper初始化
          • 步骤3. Mapper运行
          • 附记
          • 参考
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档