前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >图解大数据 | 实操案例-MapReduce大数据统计

图解大数据 | 实操案例-MapReduce大数据统计

原创
作者头像
ShowMeAI
发布2022-03-08 18:09:09
8290
发布2022-03-08 18:09:09
举报
文章被收录于专栏:ShowMeAI研究中心ShowMeAI研究中心

作者:韩信子@ShowMeAI

教程地址http://www.showmeai.tech/tutorials/84

本文地址http://www.showmeai.tech/article-detail/170

声明:版权所有,转载请联系平台与作者并注明出处

1.引言

本教程ShowMeAI详细给大家讲解Hadoop使用Map-Reduce进行数据统计的方法,关于Hadoop与map-reduce的基础知识,大家可以回顾ShowMeAI的基础知识讲解篇分布式平台Hadoop与Map-reduce详解

尽管大部分人使用Hadoop都是用java完成,但是Hadoop程序可以用python、C++、ruby等完成。本示例教大家用python完成MapReduce实例统计输入文件的单词的词频。

  • 输入:文本文件
  • 输出:单词和词频信息,用 \t 隔开

2.Python实现 MapReduce 代码

使用python完成MapReduce需要利用Hadoop流的API,通过STDIN(标准输入)、STDOUT(标准输出)在Map函数和Reduce函数之间传递数据。

我们会利用Python的sys.stdin读取输入数据,并把我们的输出传送给 sys.stdout。Hadoop流将会完成其他的工作。

一个抽象的Hadoop大数据处理流程如下图所示

对于本文提到的任务,我们做一个更详细的拆解,整个Hadoop Map-Reduce过程如下图所示

从上图,我们可以看到,我们在当前任务中,需要核心通过代码完成的步骤是:

  • Map:产生词与次数标记键值对
  • Reduce:聚合同一个词(key)的值,完成统计

下面我们来看看,通过python如何完成这里的 Map 和 Reduce 阶段。

2.1 Map阶段:mapper.py

在这里,我们假设map阶段使用到的python脚本存放地址为 ShowMeAI/hadoop/code/mapper.py

代码语言:python
复制
#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print "%s\t%s" % (word, 1)

解释一下上述代码:

  • 文件从STDIN读取文件。
  • 把单词切开,并把单词和词频输出STDOUT。
  • Map脚本不会计算单词的总数,而是直接输出 1(Reduce阶段会完成统计工作)。

为了使脚本可执行,增加 mapper.py 的可执行权限:

代码语言:txt
复制
chmod +x ShowMeAI/hadoop/code/mapper.py

2.2 Reduce阶段:reducer.py

在这里,我们假设reduce阶段使用到的python脚本存放地址为 ShowMeAI/hadoop/code/reducer.py

代码语言:python
复制
#!/usr/bin/env python
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:  #count如果不是数字的话,直接忽略掉
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:  #不要忘记最后的输出
    print "%s\t%s" % (current_word, current_count)

文件会读取 mapper.py 的结果作为 reducer.py 的输入,并统计每个单词出现的总的次数,把最终的结果输出到STDOUT。

为了是脚本可执行,增加 reducer.py 的可执行权限

代码语言:txt
复制
chmod +x ShowMeAI/hadoop/code/reducer.py

3.本地测试MapReduce流程

通常我们在把数据处理流程提交到集群进行运行之前,会本地做一个简单测试,我们会借助linux的管道命令 (cat data | map | sort | reduce) 对数据流进行串接,验证我们写的 mapper.pyreducer.py脚本功能是否正常。这种测试方式,能保证输出的最终结果是我们期望的。

测试的命令如下:

代码语言:txt
复制
cd ShowMeAI/hadoop/code/
echo "foo foo quux labs foo bar quux" | python mapper.py
echo ``"foo foo quux labs foo bar quux"` `| python mapper.py | sort -k1, 1  | python reducer.py

其中的sort过程主要是完成以key为基准的排序,方便reduce阶段进行聚合统计。

4.Hadoop集群运行python代码

4.1 数据准备

我们对以下三个文件进行词频统计,先根据下述路径下载:

将文件放置到 ShowMeAI/hadoop/datas/ 目录下。

4.2 执行程序

把本地的数据文件拷贝到分布式文件系统HDFS中。

代码语言:txt
复制
bin/hadoop dfs -copyFromLocal ShowMeAI/hadoop/datas  hdfs_in

查看:

代码语言:txt
复制
bin/hadoop dfs -ls

查看具体的文件:

代码语言:txt
复制
bin/hadoop dfs -ls /user/showmeai/hdfs_in

执行MapReduce job:

代码语言:txt
复制
bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file ShowMeAI/hadoop/code/mapper.py     -mapper ShowMeAI/hadoop/code/mapper.py \
-file ShowMeAI/hadoop/code/reducer.py    -reducer ShowMeAI/hadoop/code/reducer.py \
-input /user/showmeai/hdfs_in/*    -output /user/showmeai/hdfs_out

实例输出:

查看输出结果是否在目标目录 /user/showmeai/hdfs_out

代码语言:txt
复制
bin/hadoop dfs -ls /user/showmeai/hdfs_out

查看结果:

代码语言:txt
复制
bin/hadoop dfs -cat /user/showmeai/hdfs_out2/part-00000

输出:

5.Mapper 和 Reducer代码优化

5.1 python中的迭代器和生成器

我们这里对Map-Reduce的代码优化主要基于迭代器和生成器,对这个部分不熟悉的同学可以参考ShowMeAI的python部分内容 → 《图解python | 迭代器与生成器》

5.2 优化Mapper 和 Reducer代码

代码语言:python
复制
mapper.py
#!/usr/bin/env python
import sys
def read_input(file):
    for line in file:
        yield line.split()

def main(separator='\t'):
    data = read_input(sys.stdin)
    for words in data:
        for word in words:
            print "%s%s%d" % (word, separator, 1)

if __name__ == "__main__":
    main()

reducer.py
#!/usr/bin/env python
from operator import itemgetter
from itertools import groupby
import sys

def read_mapper_output(file, separator = '\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator = '\t'):
    data = read_mapper_output(sys.stdin, separator = separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except valueError:
            pass

if __name__ == "__main__":
    main()

我们对代码中的groupby做一个简单代码功能演示讲解,如下:

代码语言:python
复制
from itertools import groupby
from operator import itemgetter

things = [('2009-09-02', 11),
          ('2009-09-02', 3),
          ('2009-09-03', 10),
          ('2009-09-03', 4),
          ('2009-09-03', 22),
          ('2009-09-06', 33)]

sss = groupby(things, itemgetter(0))
for key, items in sss:
    print key
    for subitem in items:
        print subitem
    print '-' * 20

结果:

代码语言:txt
复制
2009-09-02
('2009-09-02', 11)
('2009-09-02', 3)
--------------------
2009-09-03
('2009-09-03', 10)
('2009-09-03', 4)
('2009-09-03', 22)
--------------------
2009-09-06
('2009-09-06', 33)
--------------------

代码中:

  • groupby(things, itemgetter(0)) 以第0列为排序目标
  • groupby(things, itemgetter(1)) 以第1列为排序目标
  • groupby(things) 以整行为排序目标

6.参考资料

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.引言
  • 2.Python实现 MapReduce 代码
    • 2.1 Map阶段:mapper.py
      • 2.2 Reduce阶段:reducer.py
      • 3.本地测试MapReduce流程
      • 4.Hadoop集群运行python代码
        • 4.1 数据准备
          • 4.2 执行程序
          • 5.Mapper 和 Reducer代码优化
            • 5.1 python中的迭代器和生成器
              • 5.2 优化Mapper 和 Reducer代码
              • 6.参考资料
              相关产品与服务
              大数据
              全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档