前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >五分钟深入 Hadoop 输入优化

五分钟深入 Hadoop 输入优化

作者头像
包子面试培训
发布2018-04-19 11:20:15
4630
发布2018-04-19 11:20:15
举报
文章被收录于专栏:包子铺里聊IT

当面试公司问起 Hadoop 经验时,我们当然不能只停留在 Mapper 干了什么、Reducer 干了什么。没有 Performance Tuning 怎么能显示出我们的高大上呢?

下面几篇文章,包子培训将深入浅出的讲解 advanced hadoop tuning. 力争让你在面试中滔滔不绝,震住你的面试官。:)

不对 input 做精心处理有可能彻底毁掉 Hadoop 的强大功能

大家还记得我们第一篇 <5分钟零基础搞懂 Hadoop> 中的例子吗?

我们把数据分别存贮在好多台机器上,然后让各台机器处理自己上的数据。

但是, 实际情况中有可能发生:

Host A 上数据超级多, 其他所有 Host B, C, D…上超级少。

假设每台机器处理数据的速度是一样的,比如 CPU、内存、硬盘性能一样,那么显然 host A 要比其他 host 花更多时间处理数据。这会造成当 host B,C,D…都处理完数据后,一直等待 host A 处理。

我们想一想, 如果可以让 host A 上的一些数据转移到 B,C,D…上,让所有 host 有同样多的数据,那我们的整个 Hadoop Job 就可以更快完成了!

企业从实际情况中总结的经验告诉我们,节省的时间不只几分钟,长的可以到几小时!

如果这个情况发生在 Hadoop 集群里,考虑到 Hadoop 中的每台机器需要在 mapper 中把处理结果传给 reduce 阶段,由于 host A 上的数据远多于其他 host, host A 花在处理数据的时间,加上 disk 读写的时间,加上花在网络传输数据的时间,都会比其他机器多更多!

所以,尽量平均分配每台机器要处理的数据,真是势在必行!

那我们应当如何平衡每台机器的负担呢?我们先了解一下 Hadoop 如何读取数据的吧,只有搞清楚工具本身,我们才能更好的利用它。

Hadoop 是怎么处理 input 的?

大家还记得我们上篇文章里介绍用户如何向 JobTracker 提交 Job 吗? (点击 阅读原文 查看上一篇文章)

用户的代码,无论是 Java 还是别的语言,都需要使用 Hadoop 提供的一个 client side library JobClient. 它是用户代码和 JobTracker 打交道的接口。

运行 JobClient.runJob() 会让 JobClient 把所有 Hadoop Job 的信息,比如 mapper reducer jar path, mapper / reducer class name, 输入文件的路径等等,告诉给 JobTracker 并开始运行这个 Job.

除此之外,JobClient.runJob() 还会做一件事:使用 InputFormat class 去计算如何把 input file 分割成一份一份,然后交给 mapper 处理。

inputformat.getSplit() 函数返回一个 InputSplit 的 List, 每一个 InputSplit 就是一个 mapper 需要处理的数据。

通常,我们把 input file 存在 HDFS,(我们上篇文章提到的分布式文件系统)HDFS 可以存储很大很大的文件,为了让普通计算机可以存储这么大的文件,HDFS 把这个文件分割成若干小块 (block),然后把这些小块分别存储在不同机器上,HDFS 会记住每个小块存储的机器信息以及路径。这样,当用户需要读取一个文件时,只要告诉 HDFS 文件名,HDFS 就可以到存储这个文件的那些机器上把文件小块组合起来,返回给用户。

一个 Hadoop Job的 input 既可以是一个很大的 file, 也可以是多个 file; 无论怎样,getSplit() 都会计算如何分割 input.

我们需要注意的是:输入文件在 HDFS 中以小块形式存储在不同机器上,那 Split 需要包含哪些内容才能让每个 mapper 清楚的知道它要处理的数据在哪里呢?

我们仔细想一想,在一个分布式环境中, 能够唯一的确定一个文件的位置的信息应该包括:

  • 文件小块所在机器 ID;
  • 文件在 HDFS 上的路径;

我们还需要注意:Split 有可能是文件一个整小块,或者多个整小块,那以上信息就够了。但是当 getSplit() 平均分配时,很有可能一个 split 得到的是小块当中的一部分,或者几个整小块加上一个小块的一 部分。那么我们还需要

  • 文件起始位置
  • 文件包括长度

这样, 每个 Split 就可以包括一个 mapper 要处理的数据的精确位置:文件在哪儿、文件中的哪些数据。

注意,Split 里存储的都是待处理数据的位置信息,并不是数据本身。

我们打开 inputFormat 的 java doc 可以发现,它实际上是个 interface, 需要 class 来继承,提供分割 input 的逻辑。

Jobclient 有一个方法叫 setInputFormat(), 通过它,我们可以告诉 JobTracker 想要使用的 InputFormat class 是什么。如果我们不设置,Hadoop默认的是 TextInputFormat, 它默认为文件在 HDFS上的每一个 Block 生成一个对应的 InputSplit. 所以大家使用 Hadoop 时,也可以编写自己的 input format, 这样可以自由的选择分割 input 的算法,甚至处理存储在 HDFS 之外的数据。

我们上篇文章说 JobTracker 尽量把 mapper 安排在离它要处理的数据比较近的机器上,以便 mapper 从本机读取数据,节省网络传输时间。具体实现是这么回事儿:对于每个 map task, 我们知道它的 split 包含的数据所在的 host 位置,我们就把 mapper 安排在那个 host 上好了,至少是比较近的host. 你可能会问:split 里存储的 host 位置是 HDFS 存数据的 host 啊,和 MapReduce 的 host 有什么相关呢?为了达到数据本地性,其实通常把MapReduce 和 HDFS 部署在同一组 host 上。

既然一个 InputSplit 对应一个 map task, 那么当 map task 收到它所处理数据的位置信息,它就可以从 HDFS 读取这些数据了。HDFS 上的 input file 可以是五花八门的文件,比如 txt 文件、xml 文件、csv 文件、json 文件…

Map 具体怎么处理呢?

我们看 Hadoop mapper interface的java doc, 它所定义的 map function 的输入是一个 key value pair.

实际上,Hadoop 会把每个 mapper 的输入数据再次分割,分割成一个个 key value pair, 然后为每一个 key value pair invoke map function once. 为了这一步分割,Hadoop 使用到另一个 class: RecordReader. 它主要的方法是 next(), 作用就是从 InputSplit 读出一条 key value pair.

RecordReader 可以被定义在每个 InputFormat class中。当我们通过 JobClient.setInputFormat() 告诉 Hadoop inputForma class name 的时候, RecordReader 的定义也一并传递。

如何调配 input 才能让 Hadoop 发挥最大效能?

原则:

  • 从我们开始的例子可以发现,为每个 mapper function 平均分配任务会对 Job 整体性能有很大帮助;
  • 尽量同时使用所有 mapper host 资源,在数据处理总量不变的情况下,使用更多的 host 会让 job 总时间更短,不要让一个 InputSplit 太大;
  • 同时,inputSplit 也不要太小。太小的 Split 容易造成整个 cluster 要处理太多 map Task. JobTracker 通过 TaskTracker heartbeat 来通知任务,这种 heartBeat 通常几秒,如果存在太多的 map Task,则总等待时间很长;而且,每一个 map Task 都要由一个 JVM 来完成,启动 JVM 和收回 JVM成本都很高。(尽管 JVM reuse 可以减少这个问题,但是 schedule mapTask 依然要花不少时间)

基于以上原则,在实践中,我们要:

  • 如果处理数据在 HDFS 中,而且都是大文件。通常使用 default 的 TextInputFormat 就够了。它把输入文件每一个 block 分成一个 split, 默认 64MB.
  • 如果处理超大文件, 比如 >1TB, 而又不能增加 host 数目。考虑增加配置参数 mapred.min.split.size, 这样每个 mapper 处理更多数据,而总 mapper数减少。
  • 如果 mapper 平均处理时间太短,比如短于 1min, 而 mapper 数目又太多。可以增加配置参数 mapred.min.split.size, 这样每个 mapper 处理更多数据,总 mapper 数减少。
  • 如果处理的都是大量 HDFS 中的小文件,可以使用 CombineTextInputFormat.
  • 使用 Hadoop 本身提供的 InputFormat class. Hadoop community 提供的 InputFormat 毕竟是多年经验积累,可以优化处理不少问题。当然,如果你的 input 很有个性,或者你想要的 split 算法需要很有个性,大胆写自己的 InputFormat 吧!

笔者就曾经开发过一个 Hadoop Application, 输入文件是很多行远端文件系统的文件路径,需要 mapper 自己去下载,这种情况下就只能写自己的InputFormat 和 RecordReader 了。

期待下一篇

通过以上学习,希望大家可以对 Hadoop 输入优化有个比较深入的了解。下一篇我们将为大家介绍 Reduce阶段的具体细节以及优化方式。敬请关注!

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2015-09-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 包子铺里聊IT 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 不对 input 做精心处理有可能彻底毁掉 Hadoop 的强大功能
  • Hadoop 是怎么处理 input 的?
  • 如何调配 input 才能让 Hadoop 发挥最大效能?
  • 期待下一篇
相关产品与服务
云硬盘
云硬盘(Cloud Block Storage,CBS)为您提供用于 CVM 的持久性数据块级存储服务。云硬盘中的数据自动地在可用区内以多副本冗余方式存储,避免数据的单点故障风险,提供高达99.9999999%的数据可靠性。同时提供多种类型及规格,满足稳定低延迟的存储性能要求。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档