MapReduce与批处理------《Designing Data-Intensive Applications》读书笔记14

之前的文章大量的内容在和大家探讨分布式存储,接下来的章节进入了分布式计算领域。坦白说,个人之前专业的重心侧重于存储,对许多计算的内容理解可能不是和确切,如果文章中的理解有所不妥,愿虚心赐教。本篇将和大家聊一聊分布式计算的一个子集:批处理

批处理系统通常也叫脱机系统,需要大量的输入数据,运行一个作业来处理它,并产生一些输出数据。工作通常需要一段较长的时间(从几分钟到几天)。批处理作业通常是周期性地运行的(例如,一天一次)。批处理作业的主要性能度量通常是吞吐量

1.MapReduce

批处理是我们构建可靠、可扩展和可维护应用程序的重要组成部分。而谷歌在2004年发布的批处理算法:MapReduce,是处理大规模数据集的重要模型,虽然与为数据仓库专门开发的并行处理系统相比,MapReduce是一种相当低级的编程模型,但它依然对批处理的模型理解有很大的帮助,所以我们以MapReduce作为起点,开启我们的批处理的计算之旅。

分布式存储系统与MapReduce

MapReduce是一种相当生硬,野蛮的工具,但却十分有效。单个MapReduce作业:可以有一个或多个输入,并生成一个或多个输出。 MapReduce作业是函数式编程的模型,不会修改输入,除了生成输出之外,不会产生任何副作用。输出文件按顺序编写一次(不修改已写入文件的任何现有部分)。

MapReduce作业需要读、写文件的分布式文件系统。如:HDFS,GFS,GlusterFS,Amazon S3 等等。之后我们使用HDFS作为运行环境,但这些原则适用于任何的分布式存储系统。HDFS是基于无共享的存储集群,而共享磁盘存储由集中式存储设备实现,通常使用定制硬件和特殊的网络基础设施(如光纤通道)。所以HDFS不需要特殊的硬件,只需要由传统的数据中心网络连接的计算机。HDFS的守护进程的每台计算机上运行,将允许其他节点访问存储在该计算机上文件的数据,而中央服务器NameNode跟踪哪些文件块存储在哪台机器。因此,创建一个大的文件HDFS上,可以使用集群之中的所有计算机。

为了容忍机器和磁盘故障,可以在集群的多台机器上复制文件块。所以多台机器上的同一数据的几个副本,当然这里也可以使用纠删码技术,可以允许丢失的数据以比完全复制更低的存储开销被存储。纠删码技术类似于RAID,它在同一台机器上的多个磁盘上提供冗余。不同之处在于,对纠删码的副本进行读写时需要额外的编解码计算。

MapReduce的工作流程

MapReduce与传统的UNIX命令管道的主要区别在于,MapReduce可以跨越多台计算机并行计算,手动编写的Mapper和Reducer不需要了解输入来自何处或输出的去处,由框架来处理机器之间移动数据的复杂性。

下图展示了一个MapReduce作业的工作流程,作业的输入是HDFS的一个目录,目录内每个文件块作为一个单独的分区,由一个单独的Map任务处理,每个输入文件的大小通常是数百兆字节(取决于HDFS的块大小)。MapReduce的调度器试图在存储输入文件副本块机器上运行Mapper,只要该机器有足够内存和CPU资源来运行Map任务。通过这样的方式节省了在网络上复制文件块的带宽,减少了网络负载,利用了分布式计算的局部性原理。

MapReduce的工作数据流

应用程序代码被打包成Jar文件,上传到分布式存储系统之上,对应的节点会下载应用程序的Jar文件,然后启动Map任务并开始读取输入文件,每次将一条记录传递给Mapper的回调函数,并输出Map处理过后的键值对。Map的任务的数量取决于输入文件块的数量,但是Reduce任务的数量由作业作者配置,为了确保同一个键的所有键值对都由同一个Reducer处理,框架使用一个散列键来确定键值对应该对应的Reduce任务。

MapReduce需要对键值对进行排序,但数据集可能太大,无法用一台机器上的常规排序算法进行排序。所以,每个Map任务根据散列将键值对输出到对应的Reducer的磁盘分区,并对键值对进行排序。每当Mapper完成工作时,MapReduce调度器通知Reducer,它们可以开始从Mapper获取输出文件。Reducer从Mapper端获取对应的输出的键值对文件,并进行归并排序,保持排序顺序,这个过程称之为Shuffle。

最后,Reducer调用Reduce函数来处理这些有序的键值对,并且可以生成任意数量的输出记录,并写入分布式存储系统。这便是一次完整的MapReduce任务的全过程。

MapReduce作业的链式调度

一个MapReduce作业可以解决的问题范围是有限的。因此,MapReduce的作业需要被链接到工作流中,这样一个作业的输出就成为下一个作业的输入。Hadoop的MapReduce框架,可以隐式的通过目录名来链接:第一个MapReduc的作业配置写输出到HDFS的指定的目录,第二个MapReduce作业读取相同的目录名作为输入。从MapReduce的框架来看,它们是两个独立的工作。

只有当前一个作业成功完成时,下一个作业的输入才会被认为是有效的(失败的MapReduce作业的结果会被丢弃)。所以不同的作业之前会产生依赖关系的有向无环图,来处理这些依赖关系的工作执行,目前Hadoop有许多对批处理的调度程序,如:Oozie,Azkaban, Luigi, Airflow,等。在一个大型公司之中,许多不同的团队可能运行不同的工作,它们读取彼此的输出,所以通过工具支持管理等复杂的数据流是很重要的。

2.MapReduce作业的业务场景

我们通过一个实例,来具体了解类MapReduce作业的业务场景。如下图所示:左边是一个由日志记录的行为描述,称为用户活动,右边是一个数据库的用户用户表。

用户活动日志与用户的信息表

数据分析人员的任务可能需要将用户活动与用户的信息关联起来:分析哪些页面最受年龄组的欢迎。但是用户活动日志之中,只包含了用户的ID,而不包含完整的用户信息。这时候就需要一个Join操作,最简单的实现思路是逐一检查用户活动,并对每个用户ID来查询用户数据库,显然,这样的实现会带来很糟糕的性能表现。任务吞吐量将受到数据库服务器往返时间的限制,并且本地缓存的有效性将非常依赖于数据的分布,并行运行海量的查询可能会超出数据服务器的处理能力。为了在作业过程之中有更大的吞吐量,计算必须(尽可能地)在一台机器上进行。通过网络上随机访问请求要处理的每一条记录是十分缓慢的。此外,查询远程数据库将意味着批处理作业变得不确定,因为远程数据库中的数据随时可能会更改。

因此,更好的方法是获取用户数据库的副本(使用ETL将数据库的数据中提取到“数据仓库”),并将其放入分布式存储系统之中。这样,我们可以使用MapReduce这样的工具来更加有效地处理。如下图所示:由MapReduce框架按键对Mapper输出进行分区,然后对键值对排序时,其效果是所有活动事件和具有相同用户ID的用户记录在同一个Reducer之中并且彼此相邻。之后,Reducer可以很容易地执行实际的Join逻辑:每个用户ID都调用一次Reduce函数,输出活动的URL和用户的年龄。随后可以启动一个新的MapReduce作业来计算每个URL的查看器年龄分布,并按年龄组分组。

通过MapReduce作业来处理的业务逻辑

接下来,我们来梳理一些业务层面的细节,以及用MapReduce框架的一些细节:

  • 业务逻辑分离 在上述的业务场景之中,最重要的就是保证同一个用户ID的活动需要汇集到同一个Reducer来进行处理,这个就是前文我们聊到Shuffle的功能,所有键值相同的键值对都会被传递到相同的目的地。MapReduce编程模型将计算的通信协作与应用程序逻辑处理分离。这就是MapReduce框架的高明之处,由MapReduce的框架本身处理所有的网络通信,业务人员专注于应用程序代码的实现,如果在这个过程之中出现了节点的故障,MapReduce透明的失败重试来确保应用程序逻辑不受影响。
  • 数据分组 数据除了Join场景之外,通过键值对对数据进行分组也是数据系统常用的操作:对所有具有相同键的记录都形成一个组,之后对组内的数据进行操作。 现在问题来了?我们怎么样使用MapReduce来实现这样的分组操作呢?实现方式也很简单,通过在Map函数之中对键值对进行改造,插入使键值对产生预期分组的Key,之后分区和排序将相同的Key汇集到同一个Reducer之中。在MapReduce上实现时,分组和Join看起来非常相似。
  • 数据倾斜 如果同一个键相关的数据量非常大,对于MapReduce框架来说可能会成为一个挑战,因为相同键会汇集到同一个Reducer进行处理。例如,在社交网络中,少数名人可能有数以百万计的追随者。(第一章我们就举过这个例子)所以在MapReduce作业之中存在数据倾斜,如何来进行补偿呢?在Pig之中,会先运行一个采样任务来确定哪个键是热的,在作业实际执行时,Mapper会把出现数据倾斜的键值对通过随机选择分发个指定的多个Reducer。而Hive的倾斜连接优化采用了另一种方法。它需要在表元数据中显式指定热键,它将与这些键相关的记录存储在元数据之中,后续对表进行操作时,采用类似于Pig的优化思路。

3.批处理的意义

前文已经讨论了MapReduce作业的工作流程,现在我们回到一个问题来:所有处理的结果是什么?为什么我们一开始就要做所有这些工作? 批处理操作的核心是对数据系统之中的数据进行解析,这类操作需要扫描大量的记录,进行分组和聚合,并输出到数据库以报告的形式呈现,通过报告给消费者或分析师进行数据决策。

同样,批处理适合建立搜索索引。谷歌最初使用MapReduce是为它的搜索引擎构建索引,通过5到10个MapReduce作业的工作流来实现实现的。如果需要执行全文搜索一组文件中,通过批处理过程是一个非常有效的方法:由每个Map任务对数据分区,之后每个Reducer建立分区索引,将索引文件写入到分布式文件系统。因为通过关键字查询搜索索引是只读操作,这些索引文件在创建后是不可变的。 如果索引的文档集发生变化,一个选项是周期性地为整个文档集重新运行整个索引工作流程,并在完成新索引文件时将以前的索引文件替换为新的索引文件。(如果只是少量文件的变化,则不适用批处理任务进行处理)

批处理的作业的将输入视为不可变且避免副作用(如向外部数据库写入),不仅实现了良好的性能,而且变得更容易维护。如果您在代码中引入了一个bug,输出错误,可以简单地回滚到以前版本的代码并重新运行该作业,并且再次输出正确的结果。更简单的解决方案,可以将旧输出保存在不同的目录中,然后简单地进行切换。由于这种易于回滚的特性,功能开发可以比在不能回滚的环境中进行得更快。有利于敏捷的软件开发。批处理将逻辑处理代码与配置分离,这里便允许优雅地重用代码:一个团队可以专注于实现逻辑处理,而其他团队可以决定何时何地运行该作业。

小结:

本篇我们梳理了MapReduce的处理框架,并探讨了许多批处理作业的特点。除了MapReduce的模型,数据系统中仍然有许多处理数据的计算模型,接下来会和大家来继续探讨数据系统之中的计算模型..............

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏安富莱嵌入式技术分享

【RL-TCPnet网络教程】第21章 RL-TCPnet之高效的事件触发框架

本章节为大家讲解高效的事件触发框架实现方法,BSD Socket编程和后面章节要讲解到的FTP、TFTP和HTTP等都非常适合使用这种方式。实际项目中也推荐大家...

12040
来自专栏CSDN技术头条

变不可能为可能,Tachyon帮助Spark变小时级任务到秒

本文作者是Gianmario Spacagna和Harry Powell,Barclays的数据科学家。 集群计算和大数据技术已经取得了很多进展,不过现在很多大...

20680
来自专栏安富莱嵌入式技术分享

【RL-TCPnet网络教程】第6章 RL-TCPnet底层驱动说明

本章节为大家讲解RL-TCPnet的底层驱动,主要是STM32自带MAC的驱动实现和PHY的驱动实现。

19820
来自专栏Golang语言社区

机器人 Go 语言库:Gobot

Gobot 是为机器人和物理计算所设计的一组 Go 语言库,提供在同一时间合并多个不同设备的简单且强大的解决方案。 package main import (...

1.1K50
来自专栏华章科技

成为大数据顶尖程序员,先过了这些Hadoop面试题!(附答案解析)

导读:在大数据开发岗位的需求下,工资待遇水涨船高,不少编程人员在面对职业瓶颈期的时候,会选择转编程方向发展。

8720
来自专栏Spark学习技巧

Flink并行度

本节介绍如何在Flink中配置程序的并行执行。FLink程序由多个任务(转换/操作符、数据源和sinks)组成。任务被分成多个并行实例来执行,每个并行实例处理任...

43910
来自专栏Albert陈凯

详细探究Spark0.8的shuffle实现

Background 在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这...

28650
来自专栏cloudskyme

hadoop使用(六)

第1章 引言 1.1 编写目的 介绍pig,一个不得不说的hadoop的扩展。 1.2 什么是pig Pig是一个基于Hadoop的大规模数据分析平台,它提供的...

35360
来自专栏杨建荣的学习笔记

system表空间不足的问题分析(r6笔记第66天)

很多事情见多了也就有了麻木的感觉,报警短信就是如此,每天总能收到不少的报警短信,可能很多时候就扫一眼,如果没有严重的问题自己是不会情愿打开电脑处理的。 对于此,...

28240
来自专栏牛客网

蚂蚁金服面经(3+4)

【每日一语】我和这个世界不熟。这并非是我撕裂的原因。我依旧有很多完整,至少我要成全我自己。──北岛《我和这个世界不熟》

23920

扫码关注云+社区

领取腾讯云代金券