当面试公司问起 Hadoop 经验时,我们当然不能只停留在 Mapper 干了什么、Reducer 干了什么。没有 Performance Tuning 怎么能显示出我们的高大上呢?
下面几篇文章,包子培训将深入浅出的讲解 advanced hadoop tuning. 力争让你在面试中滔滔不绝,震住你的面试官。:)
大家还记得我们第一篇 <5分钟零基础搞懂 Hadoop> 中的例子吗?
我们把数据分别存贮在好多台机器上,然后让各台机器处理自己上的数据。
但是, 实际情况中有可能发生:
Host A 上数据超级多, 其他所有 Host B, C, D…上超级少。
假设每台机器处理数据的速度是一样的,比如 CPU、内存、硬盘性能一样,那么显然 host A 要比其他 host 花更多时间处理数据。这会造成当 host B,C,D…都处理完数据后,一直等待 host A 处理。
我们想一想, 如果可以让 host A 上的一些数据转移到 B,C,D…上,让所有 host 有同样多的数据,那我们的整个 Hadoop Job 就可以更快完成了!
企业从实际情况中总结的经验告诉我们,节省的时间不只几分钟,长的可以到几小时!
如果这个情况发生在 Hadoop 集群里,考虑到 Hadoop 中的每台机器需要在 mapper 中把处理结果传给 reduce 阶段,由于 host A 上的数据远多于其他 host, host A 花在处理数据的时间,加上 disk 读写的时间,加上花在网络传输数据的时间,都会比其他机器多更多!
所以,尽量平均分配每台机器要处理的数据,真是势在必行!
那我们应当如何平衡每台机器的负担呢?我们先了解一下 Hadoop 如何读取数据的吧,只有搞清楚工具本身,我们才能更好的利用它。
大家还记得我们上篇文章里介绍用户如何向 JobTracker 提交 Job 吗? (点击 阅读原文 查看上一篇文章)
用户的代码,无论是 Java 还是别的语言,都需要使用 Hadoop 提供的一个 client side library JobClient. 它是用户代码和 JobTracker 打交道的接口。
运行 JobClient.runJob() 会让 JobClient 把所有 Hadoop Job 的信息,比如 mapper reducer jar path, mapper / reducer class name, 输入文件的路径等等,告诉给 JobTracker 并开始运行这个 Job.
除此之外,JobClient.runJob() 还会做一件事:使用 InputFormat class 去计算如何把 input file 分割成一份一份,然后交给 mapper 处理。
inputformat.getSplit() 函数返回一个 InputSplit 的 List, 每一个 InputSplit 就是一个 mapper 需要处理的数据。
通常,我们把 input file 存在 HDFS,(我们上篇文章提到的分布式文件系统)HDFS 可以存储很大很大的文件,为了让普通计算机可以存储这么大的文件,HDFS 把这个文件分割成若干小块 (block),然后把这些小块分别存储在不同机器上,HDFS 会记住每个小块存储的机器信息以及路径。这样,当用户需要读取一个文件时,只要告诉 HDFS 文件名,HDFS 就可以到存储这个文件的那些机器上把文件小块组合起来,返回给用户。
一个 Hadoop Job的 input 既可以是一个很大的 file, 也可以是多个 file; 无论怎样,getSplit() 都会计算如何分割 input.
我们需要注意的是:输入文件在 HDFS 中以小块形式存储在不同机器上,那 Split 需要包含哪些内容才能让每个 mapper 清楚的知道它要处理的数据在哪里呢?
我们仔细想一想,在一个分布式环境中, 能够唯一的确定一个文件的位置的信息应该包括:
我们还需要注意:Split 有可能是文件一个整小块,或者多个整小块,那以上信息就够了。但是当 getSplit() 平均分配时,很有可能一个 split 得到的是小块当中的一部分,或者几个整小块加上一个小块的一 部分。那么我们还需要
这样, 每个 Split 就可以包括一个 mapper 要处理的数据的精确位置:文件在哪儿、文件中的哪些数据。
注意,Split 里存储的都是待处理数据的位置信息,并不是数据本身。
我们打开 inputFormat 的 java doc 可以发现,它实际上是个 interface, 需要 class 来继承,提供分割 input 的逻辑。
Jobclient 有一个方法叫 setInputFormat(), 通过它,我们可以告诉 JobTracker 想要使用的 InputFormat class 是什么。如果我们不设置,Hadoop默认的是 TextInputFormat, 它默认为文件在 HDFS上的每一个 Block 生成一个对应的 InputSplit. 所以大家使用 Hadoop 时,也可以编写自己的 input format, 这样可以自由的选择分割 input 的算法,甚至处理存储在 HDFS 之外的数据。
我们上篇文章说 JobTracker 尽量把 mapper 安排在离它要处理的数据比较近的机器上,以便 mapper 从本机读取数据,节省网络传输时间。具体实现是这么回事儿:对于每个 map task, 我们知道它的 split 包含的数据所在的 host 位置,我们就把 mapper 安排在那个 host 上好了,至少是比较近的host. 你可能会问:split 里存储的 host 位置是 HDFS 存数据的 host 啊,和 MapReduce 的 host 有什么相关呢?为了达到数据本地性,其实通常把MapReduce 和 HDFS 部署在同一组 host 上。
既然一个 InputSplit 对应一个 map task, 那么当 map task 收到它所处理数据的位置信息,它就可以从 HDFS 读取这些数据了。HDFS 上的 input file 可以是五花八门的文件,比如 txt 文件、xml 文件、csv 文件、json 文件…
Map 具体怎么处理呢?
我们看 Hadoop mapper interface的java doc, 它所定义的 map function 的输入是一个 key value pair.
实际上,Hadoop 会把每个 mapper 的输入数据再次分割,分割成一个个 key value pair, 然后为每一个 key value pair invoke map function once. 为了这一步分割,Hadoop 使用到另一个 class: RecordReader. 它主要的方法是 next(), 作用就是从 InputSplit 读出一条 key value pair.
RecordReader 可以被定义在每个 InputFormat class中。当我们通过 JobClient.setInputFormat() 告诉 Hadoop inputForma class name 的时候, RecordReader 的定义也一并传递。
原则:
基于以上原则,在实践中,我们要:
笔者就曾经开发过一个 Hadoop Application, 输入文件是很多行远端文件系统的文件路径,需要 mapper 自己去下载,这种情况下就只能写自己的InputFormat 和 RecordReader 了。
通过以上学习,希望大家可以对 Hadoop 输入优化有个比较深入的了解。下一篇我们将为大家介绍 Reduce阶段的具体细节以及优化方式。敬请关注!