浅谈Hadoop Distcp工具的InputFormat

导语

从Hadoop的出现到现在已经超过十年了,它在大数据领域扮演着一个重要的角色,相信在Hadoop的使用过程中,或多或少的都会用到自带的一个常用工具,就是Hadoop的distcp工具,这篇文章就是简单的方式去了解他的拷贝策略原理。

背景

在集群迁移或者数据跨集群同步的过程中,必要少不了数据拷贝的动作,在同一个集群内,跨NameSpace的数据拷贝,你可以使用distcp,你也可以自己实现类似facebook提供的fastcopy的拷贝(社区好像没提供),那么在使用distcp工具的过程中,其中的一些参数到底影响了什么,他是一个怎样的原理,今天就和大家简单的分享下。

我们在命令行执行hadoop distcp命令回车,就会看到他所支持的很多参数,其中在命令行拷贝策略(-strategy)选项中,有两个参数可选参数:dynamic,uniformsize。在默认情况下使用的是uniformsize,含义是distcp的每个map会相对均衡去复制数据量大小的文件。

我们通过查看源码容易可以看出,除了命令行选项之外,distcp还能默认的去加载distcp-default.xml,我们可以放置到$HADOOP_CONF_DIR下,我们可以配置相对常用的参数到这个文件中。

  public DistCp(Configuration configuration, DistCpOptions inputOptions) throws Exception {
    Configuration config = new Configuration(configuration);
    config.addResource(DISTCP_DEFAULT_XML);
    setConf(config);
    this.inputOptions = inputOptions;
    this.metaFolder   = createMetaFolderPath();
  }

除此之外,我们还从默认的配置当中,看到了两个参数:

<property>
<name>distcp.dynamic.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.lib.DynamicInputFormat</value>
<description>Implementation of dynamic input format</description>
</property>

<property>
<name>distcp.static.strategy.impl</name>
<value>org.apache.hadoop.tools.mapred.UniformSizeInputFormat</value>
<description>Implementation of static input format</description>
</property>

这个就是上述说的两种命令行策略的参数模式。通过命名可以很容易可以看出,其实这就是两个InputFormat的实现类,distcp任务(其实也就是MR任务),通过配置命令行或者参数指定使用不同的inputFormat生成不同的splits,从而实现不同的拷贝文件的逻辑。

然而,既然有两个选项,那他们的区别在哪呢?我们可以简单的看看下图的一个整体结构

DynamicInputFormat

对于DynamicInputFormat来说,有几个重要的相关的类:DynamicRecordReader,DynamicInputChunk,DynamicInputChunkContext。

对于distcp任务,会先生成一个copy-listing文件,该文件包含复制文件的列表等信息,DynamicInputFormat的getSplits方法就是将这些切分为不同chunk,然后分配到不同的task中。

在切分copy-listing文件到不同的chunk当中,其中有几个变量,numMaps和numRecords得到splitRatio的比例,也就是算出平均每个map处理多少个chunk,然后通过总的records数量,算出每个chunk中有多少条records

static int getSplitRatio(int nMaps, int nRecords, Configuration conf) {
int maxChunksIdeal = getMaxChunksIdeal(conf);
int minRecordsPerChunk = getMinRecordsPerChunk(conf);
int splitRatio = getSplitRatio(conf);

if (nMaps == 1) {
LOG.warn("nMaps == 1. Why use DynamicInputFormat?");
return 1;
}

if (nMaps > maxChunksIdeal)
return splitRatio;

int nPickups = (int)Math.ceil((float)maxChunksIdeal/nMaps);
int nRecordsPerChunk = (int)Math.ceil((float)nRecords/(nMaps*nPickups));

return nRecordsPerChunk < minRecordsPerChunk ?
splitRatio : nPickups;
}

最终会将所有的record放到不同的chunk中,在hdfs上会在对应目录行程对应的文件类似fileList.seq.chunk.0000x

drwx------ - hadoop supergroup 0 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248
drwx------ - hadoop supergroup 0 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir
-rw-r--r-- 1 hadoop supergroup 1504 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/fileList.seq.chunk.00002
-rw-r--r-- 1 hadoop supergroup 1486 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/fileList.seq.chunk.00003
-rw-r--r-- 1 hadoop supergroup 1646 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/task_1526024399954_0017_m_000000
-rw-r--r-- 1 hadoop supergroup 1524 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/chunkDir/task_1526024399954_0017_m_000001
-rw-r--r-- 1 hadoop supergroup 6686 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/fileList.seq
-rw-r--r-- 1 hadoop supergroup 5906 2018-05-13 17:50 /emr/hadoop-yarn/staging/hadoop/.staging/_distcp1061656248/fileList.seq_sorted

然后map通过DynamicRecordReader去读取数据的时候就会将对应的chunk文件修改为task(task_1526024399954_0017_m_000000)名字,所以通过上面的文件夹输入可以看出,这时有两个map正在对数据进行拷贝,执行速度快的map会继续读取未被领取的chunk进行拷贝,这就让速度快的map可以对更多的数据进行拷贝

UniformSizeInputFormat

对于默认的UniformSizeInputFormat,他的实现方式比DynamicInputFormat简单,原理其实就是得到总的totalSizeBytes,然后除以map数量得到平均每个map处理多少数据,然后当文件的大小加起来大于nBytesPerSplit的时候,就形成一个split,这样是希望每个map处理的数据差距不会太大。

带宽控制

带宽控制主要实现在ThrottledInputStream当中,他在hadoop除了在distcp之外,也用在了NameNode之间的FSImage传输等场景上的使用,原理就是,他继承了原有的InputStream并在每次读取的时候进行每秒获取字节的速率检查(throttle),如果超过,则进行sleep:

  /**
   * Read bytes starting from the specified position. This requires rawStream is
   * an instance of {@link PositionedReadable}.
   */
  public int read(long position, byte[] buffer, int offset, int length)
      throws IOException {
    if (!(rawStream instanceof PositionedReadable)) {
      throw new UnsupportedOperationException(
          "positioned read is not supported by the internal stream");
    }
    throttle();
    int readLen = ((PositionedReadable) rawStream).read(position, buffer,
        offset, length);
    if (readLen != -1) {
      bytesRead += readLen;
    }
    return readLen;
  }

  private void throttle() throws IOException {
    while (getBytesPerSec() > maxBytesPerSec) {
      try {
        Thread.sleep(SLEEP_DURATION_MS);
        totalSleepTime += SLEEP_DURATION_MS;
      } catch (InterruptedException e) {
        throw new IOException("Thread aborted", e);
      }
    }
  }

  /**
   * Getter for the read-rate from this stream, since creation.
   * Calculated as bytesRead/elapsedTimeSinceStart.
   * @return Read rate, in bytes/sec.
   */
  public long getBytesPerSec() {
    long elapsed = (System.currentTimeMillis() - startTime) / 1000;
    if (elapsed == 0) {
      return bytesRead;
    } else {
      return bytesRead / elapsed;
    }
  }

总结:

除了本文说的参数之外,我们平时在数据拷贝的过程中,我们还可以综合的通过控制map的数量,控制带宽阈值去减少这个过程对线上系统的影响,其中还有update参数等等,我们可以按照自身的业务需求去调整自身的参数,从而达到一个相对最优的数据拷贝效果。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏架构师之路

如何实现超高并发的无锁缓存?

一、需求缘起 【业务场景】 有一类写多读少的业务场景:大部分请求是对数据进行修改,少部分请求对数据进行读取。 例子1:滴滴打车,某个司机地理位置信息的变化(可能...

60170
来自专栏JackieZheng

AngularJS入门心得2——何为双向数据绑定

  前言:谁说Test工作比较轻松,最近在熟悉几个case,差点没疯。最近又是断断续续的看我的AngularJS,总觉得自己还是没有入门,可能是自己欠前端的东西...

21480
来自专栏老安的博客

zabbix 自动发现tomcat的war包并实现监控

14820
来自专栏CDA数据分析师

总结:常用的 Python 爬虫技巧

用python也差不多一年多了,python应用最多的场景还是web快速开发、爬虫、自动化运维:写过简单网站、写过自动发帖脚本、写过收发邮件脚本、写过简单验证码...

22550
来自专栏FreeBuf

对自助提卡系统的一次代码审计

并非有意愿要审计该站,前面的走的黑盒没有过于精彩部分就不在贴上了,对于此系统站你们懂的,多说无益,这套程序是开源的,像这种自助提卡系统相信大家已经不在陌生了,很...

20230
来自专栏数据和云

Linux 内存中的Cache,真的能被回收么?

编辑手记:很多人都认为,Linux中buffers和cached所占用的内存空间是可以在内存压力较大的时候被释放当做空闲空间用的。但真的是这样么?今天我们重新来...

507110
来自专栏北京马哥教育

看完你就会!Python自动化开发必备项目之博客网站全实现

本文由马哥教育Python自动化实战班6期学员推荐,转载自互联网,作者为 lm409,内容略经小编改编和加工,观点跟作者无关,最后感谢作者的辛苦贡献与付出。 断...

67770
来自专栏枕边书

代码迁移之旅(二)- 渐进式迁移方案

说在前面 这是代码迁移的第二篇文章,也是最后一篇了,由于个人原因,原来的迁移我无法继续参与了,但完整的方案我已经准备好了,在测试环境也已经可以正常进行了。 上篇...

25890
来自专栏小巫技术博客

推荐:Mac下高效静态代码分析神器Unstand详解

15810
来自专栏流柯技术学院

接口测试之webservice

Web service是一个平台独立的,低耦合的,自包含的、基于可编程的web的应用程序,可使用开放的XML(标准通用标记语言下的一个子集)标准来描述、发布、发...

47830

扫码关注云+社区

领取腾讯云代金券