本章介绍如何使用 Python 借助 Hadoop Streming 来完成 MapReduce 任务。
其实 Hadoop Streming 很简单,但是我在网上搜索学习的时候,发现好多文章内容都是类似的,而且还有些晦涩难懂,故自己记录下完整的过程,以便能帮到更多学习的人。
本次是基于 Hadoop 伪分布式环境搭建 这篇文章中的环境来操作的。
Hadoop Streaming提供了一个便于进行MapReduce编程的工具包,使用它可以基于一些可执行命令、脚本语言或其他编程语言来实现Mapper和 Reducer,从而充分利用Hadoop并行计算框架的优势和能力,来处理大数据。需要注意的是,Streaming方式是基于Unix系统的标准输入输出来进行MapReduce Job的运行,它区别与Pipes的地方主要是通信协议,Pipes使用的是Socket通信,是对使用C++语言来实现MapReduce Job并通过Socket通信来与Hadopp平台通信,完成Job的执行。任何支持标准输入输出特性的编程语言都可以使用Streaming方式来实现MapReduce Job,基本原理就是输入从Unix系统标准输入,输出使用Unix系统的标准输出。 ——— 这段是我复制粘贴的。
简单点说就是 Hadoop 是使用 Java 语言编写的,操作起来,自然也是 Java 方便一些,但是 Hadoop 提供了一种通用的方式,即从标准输入输出上处理,所以凡是支持从标准输入输出读写的语言或脚本就都可以来编写一个 MapReduce 程序。
大概就是这样,先跑一个经典的 WordCount 的小例子来看看吧。
先跑一个简单的程序,以测试程序运行,创建 words.txt
:
123 | hello hadoophello pythonhadoop streming |
---|
MapReduce 顾名思义就是 Map 和 Reduce 两个过程。既然是 WordCount 这个统计单词出现次数的程序,那么我们先将所有的单词提取出来,并标记为 <Word, Count>
格式,这里不做 Count 处理,所有都记作 1。
12345678 | #!/usr/bin/env python#coding=utf-8import sysfor line in sys.stdin: lin = line.strip() #去除两端空格 words = line.split() #切割每个单词 for word in words: print"%s\t%s"%(word, 1) #输出到标准输出中,以便 Reduce 程序接收 |
---|
使用 Hadoop Streming 还有一个好处就是测试很方便,不用放到集群中运行,用 Linux 的管道即可完成测试。执行命令:cat words.txt | python mapper.py
。
运行结果应该是:
123456 | hello 1hadoop 1hello 1python 1hadoop 1streming 1 |
---|
对于上面 Mapper 处理的结果,我们应该放到一个 Dict 里,结果也是 <Word, Count>
,然后单词每出现一次,Count +1 即可。
123456789101112 | #!/usr/bin/env python#coding=utf-8import sysword_dict = {}for line in sys.stdin: line = line.strip() #取出两顿空格 word, count = line.split('\t', 1) word_dict.setdefault(word, 0) word_dictword += int(count)for key, value in word_dict.items(): print key, value |
---|
然后再用 Linux 管道本地测试一下:
1 | cat words.txt | python mapper.py | python reducer.txt |
---|
运行结果:
1234 | python 1hello 2streming 1hadoop 2 |
---|
如此即完成了一个 Hadoop Streaming 的程序,并完成了本地测试,那么接下来打包部署到 Hadoop HDFS 上来操作吧。
我们先找几个稍微大点的文件来测试。
123 | wget http://www.gutenberg.org/files/1342/1342-0.txt -O test1.txtwget http://www.gutenberg.org/ebooks/345.txt.utf-8 -O test2.txtwget http://www.gutenberg.org/ebooks/2443.txt.utf-8 -O test3.txt |
---|
在 HDFS 上创建一个目录用来存放测试数据源,并把这三个文件放到该目录中。
123 | hadoop fs -mkdir /input hadoop fs -put test*.txt /input |
---|
先用 find
命令在 $HADOOP_HOME
中找到 hadoop-streaming-x.x.x.jar
的绝对路径,然后创建一个 start.sh
文件,用来执行部署:
1234567 | hadoop jar /root/hadoop-2.7.1/share/hadoop/tools/lib/hadoop-streaming-2.7.1.jar -input /input/*.txt -output /output -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py \ |
---|
命令解释:
-input
:输入文件在 HDFS 中路径-output
:输出文件路径,注意这是输出目录,不是输出文件名-mapper
:用户自己写的 mapper 程序,可以是可执行文件或者脚本-reducer
:用户自己写的 reducer 程序,可以是可执行文件或者脚本-file
:打包文件到提交的作业中,可以是 mapper 或者 reducer 要用的输入文件,如配置文件,字典等。 这个一般是必须有的,因为 mapper 和 reducer 函数都是写在本地的文件中,因此需要将文件上传到集群中才能被执行。然后给予 start.sh
777权限后执行:
12 | chmod 777 start.sh./start.sh |
---|
经过一系列过程后,执行完成,然后我们来查看下 HDFS 里的 /input
目录下的内容:
1234 | $ hadoop fs -ls /inputFound 2 items-rw-r--r-- 1 root supergroup 0 2017-11-01 15:38 /output/_SUCCESS-rw-r--r-- 1 root supergroup 571618 2017-11-01 15:38 /output/part-00000 |
---|
然后大家可以使用命令查看生成结果,或从 HDFS 取回本地后查看结果,命令忘记的可以查看这篇文章:Hadoop HDFS 常用文件操作命令