12分钟
任务1 编写MapReduce可执行程序
任务目的
由Hadoop Streaming工作原理可知,我们需要两个可执行文件作为Mapper和Reducer,向集群提交Map/Reduce作业,完成词频统计的任务。
此任务将编写两个Python脚本文件,分别为mapper.py
和reducer.py
。
mapper.py
逻辑:
1.首先jieba模块会打印一些自己的日志,我们需要将这些日志的等级降低,就不会被打印了。
2.Hadoop streaming会将输入的小说文件转化为标准输入传递给Mapper可执行程序,此时读取每一行并分词,并打印Reducer需要的格式(格式为"key value"(中间为制表符\t))。
输出格式为:
李逵 1
吴用 1
人马 1
reducer.py
逻辑:
1.Hadoop Streaming会将Mapper的输出汇总并将相同词语排到一起。转化为标准输入传递给Reduce可执行程序。
格式如下:
李逵 1
李逵 1
李逵 1
洒家 1
宋江 1
宋江 1
.....
2.对这些输入进行计数,如果词语变化,说明上个词语的词频已经统计完毕,将上个词频输出,再次对下一个词语进行计数,这样会导致最后一次没有直接打印,所以在循环外打印最后一个词语的词频。
输出如下:
宋江 2725
人马 1234
洒家 235
....
任务步骤
1.编写mapper.py
文件
脚本内容如下:
#!/usr/bin/python
# coding:utf-8
import jieba
import sys
import logging
jieba.setLogLevel(logging.INFO) # 不显示jieba模块的日志输出
for line in sys.stdin:
word_list = jieba.cut(line.strip()) # 将每行文字进行拆分
for word in word_list:
try:
print ("%s\t1"%(word.encode("utf8"))) # 以"word value"格式输出,中间为制表符
except:
pass
2.编写reducer.py
文件
#!/usr/bin/python
# coding:utf-8
import sys
current_word,current_count,word = None, 1, None
# 读取标准输入
for line in sys.stdin:
try:
line = line.rstrip()
word, count = line.split("\t", 1)
count = int(count)
except:
continue
# 当前词语没有变化,就一直计数
if current_word == word:
current_count += count
# 已经换到下一个词语,打印上一个词语的词频,并将当前词语切换。
else:
if current_word:
print(("%s\t%u")%(current_word, current_count))
current_count, current_word = count, word
# 处理最后一次循环词语和词频没有被打印的问题。
if current_word == word:
print(("%s\t%u")% (current_word, current_count))
学员评价