12分钟

任务1 编写MapReduce可执行程序

任务目的

由Hadoop Streaming工作原理可知,我们需要两个可执行文件作为Mapper和Reducer,向集群提交Map/Reduce作业,完成词频统计的任务。

此任务将编写两个Python脚本文件,分别为mapper.pyreducer.py

mapper.py逻辑:

1.首先jieba模块会打印一些自己的日志,我们需要将这些日志的等级降低,就不会被打印了。

2.Hadoop streaming会将输入的小说文件转化为标准输入传递给Mapper可执行程序,此时读取每一行并分词,并打印Reducer需要的格式(格式为"key value"(中间为制表符\t))。

输出格式为:

李逵	1
吴用	1
人马	1

reducer.py逻辑:

1.Hadoop Streaming会将Mapper的输出汇总并将相同词语排到一起。转化为标准输入传递给Reduce可执行程序。

格式如下:

李逵	1
李逵	1
李逵	1
洒家	1
宋江	1
宋江	1
.....

2.对这些输入进行计数,如果词语变化,说明上个词语的词频已经统计完毕,将上个词频输出,再次对下一个词语进行计数,这样会导致最后一次没有直接打印,所以在循环外打印最后一个词语的词频。

输出如下:

宋江	2725
人马	1234
洒家	235
....

任务步骤

1.编写mapper.py文件

脚本内容如下:

#!/usr/bin/python
# coding:utf-8
import jieba
import sys  
import logging

jieba.setLogLevel(logging.INFO)   # 不显示jieba模块的日志输出
for line in sys.stdin:
    word_list = jieba.cut(line.strip()) # 将每行文字进行拆分
    for word in word_list:
        try:
            print ("%s\t1"%(word.encode("utf8")))  # 以"word  value"格式输出,中间为制表符
        except:
            pass

2.编写reducer.py文件

#!/usr/bin/python
# coding:utf-8
import sys

current_word,current_count,word = None, 1, None
# 读取标准输入
for line in sys.stdin:
    try:
        line = line.rstrip()
        word, count = line.split("\t", 1)
        count = int(count)
    except:
        continue
    # 当前词语没有变化,就一直计数
    if current_word == word:
        current_count += count
    # 已经换到下一个词语,打印上一个词语的词频,并将当前词语切换。
    else:
        if current_word:
            print(("%s\t%u")%(current_word, current_count))
        current_count, current_word = count, word

# 处理最后一次循环词语和词频没有被打印的问题。
if current_word == word:
    print(("%s\t%u")% (current_word, current_count))