前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DDIA:批中典范 MapReduce

DDIA:批中典范 MapReduce

作者头像
木鸟杂记
发布2023-12-19 16:51:12
1920
发布2023-12-19 16:51:12
举报
文章被收录于专栏:木鸟杂记木鸟杂记

MapReduce 在某种程度上有点像 Unix 工具,但不同之处在于可以分散到上千台机器上并行执行。和 Unix 工具一样,MapReduce 虽然看起来简单粗暴,但组合起来却非常强大。一个 MapReduce 任务就像一个 Unix 进程:接受一到多个输入,产生一到多个输出

和 Unix 工具一样,执行一个 MapReduce 任务不会修改输入文件,并且除了产生输出没有其他的副作用。输出文件都是单次写入、顺序追加而成(即 ,一旦文件写完,就不会再有任何改动)。

相比 Unix 工具使用 stdin 和 stdout 作为输入和输出,MapReduce 任务的输入和输出都是分布式文件系统上的文件。在 Hadoop 的 MapReduce 实现中,该文件系统被称为 HDFS(Hadoop Distributed File System),是谷歌文件系统(GFS,Google File System)的一个开源实现。

除了 HDFS 外,市面上还有很多其他的分布式文件系统,如 GlusterFS 和 QFS(Quantcast File System)。对象存储,如 Amazon S3,Azure Blob Storage 和 OpenStack Swift 也有诸多相似之处。本章我们主要以 HDFS 为例,但这些原则也适用于其他分布式文件系统。

HDFS 和对象存储不同点之一是,HDFS 能够将计算就近的调度到存储所在的机器上(调度亲和性,本质原因在于计算和存储在同一个集群,有好处也有劣势),但对象存储会将存储和计算分离。如果网络带宽是瓶颈,从本地进行文件读取要在性能上更优。需要注意的是,如果使用 EC 编码,这些局部性就会丧失,因为所有的读取都打散到多台机器上,然后还原出原始数据。

HDFS 基本设计理念是:shared-nothing (机器间不共享任何特殊硬件,纯通过网络来通信)架构。与shared-disk 架构(比如,Network Attached Storage,NAS 和 Storage Area Network,SAN)相对。Shared-disk 存储共享一个中心化的存储组件,常常会使用定制化的硬件和特殊的网络设施,如网状通道(Fibre Channel,一种高速网络互联技术)。而 Share-nothing 架构则不要求任何专用硬件,只要是一组使用常规数据中心网络串联起来的主机即可。

HDFS 由一组运行在每个主机上的守护进程(Daemon Process)组成,对外暴露网络接口,以使其他的节点可以访问存储于本机的数据文件(假设数据中心中的通用机器节点上都附有一定数量的磁盘)。一个叫做 NameNode中心节点会保存文件块和其所在机器的映射(也即文件块的 placement 信息)。因此,HDFS 可以利用所有运行有守护进程的机器上(DataNode)存储空间,在逻辑上对外提供单一且巨大的文件系统抽象。

当然现代大型数据中心的机型有一个专用化趋势,比如面向存储的机型,每个机器会挂很多硬盘,但计算能力较弱;比如面向 AI 的机型,会集成很多 GPU 。

为了容忍机器和磁盘的故障,所有的文件块(file blocks)都会在多机上进行冗余。冗余策略有两种主要流派:

  1. 多副本:在多机上存储同样数据的多个副本
  2. 冗余编码:使用纠删码erasure coding,如里索码,Reed-Solomon codes )的方式以更小的存储放大的方法对一份数据进行冗余。

后者类似于在单机多盘上提供数据冗余的 RAID(磁盘冗余阵列);不同之处在于,在 HDFS 这类分布式文件系统中,文件访问和数据冗余仅通过普通网络连接,而无需专用硬件的支持。

HDFS 能够很好地进行扩容:在本书写作时(2017 年),最大的 HDFS 集群运行在了数千台机器上,具有上 PB 的容量。这种尺度的数据存储服务的商业落地,得益于使用了开源软件,且廉价硬件的成本要远低于提供同等容量的专用硬件解决方案。

MapReduce 任务执行

MapReduce 是一个编程框架,你可以基于 MapReduce 编写代码以处理存储在分布式文件系统(如 HDFS)上的超大数据集。要想理解 MapReduce 的运行机制,最直观的还是举个例子。仍以上一小节提到的服务器日志分析为例。使用 MapReduce 进行数据处理的方式和使用 Unix 工具处理的方式很像:

  1. 读取一组输入文件,将其切分为记录(records)。在网站服务器日志的例子中,每个记录就是日志中的一行(即,使用 \n 作为记录分隔符)
  2. 调用 Mapper 函数从每个记录中抽取 key 和 value。在之前的例子中,mapper 函数是 awk '{print 7}' :抽取 URL(7)作为 key,value 留空。
  3. 将所有的 key-value 对按 key 进行排序。在前面例子中,该环节由 sort 承担。
  4. 调用 Reducer 函数对排好序的 kv 列表迭代处理。如果某个 key 出现了多次,排序环节会让其在在列表中集中到一块,因此可以在不在内存中保存过多状态的的情况下,对具有相同 key 的数据进行汇总处理。在前面例子中,reducer 对应命令 uniq -c ,功能是对所有具有相同 key 的记录值进行计数。

这四个步骤(split-map-sort-reduce)可以通过一个 MapReduce 任务来实现。你可以在步骤 2 (map)和步骤 4(reduce)编写代码来自定义数据处理逻辑。步骤 1 (将文件拆分成记录)由输入格式解析器(input format parser)来完成。步骤 3,排序阶段,由 MapReduce 框架隐式完成,所有 Mapper 的输出在给到 Reducer 前,框架都会对其进行排序。

为了创建 MapReduce 任务,你需要实现两个回调函数:mapper 和 reducer,其行为如下:

  • Mapper 对于每个输入记录都会调用一次 Mapper 函数,其任务是从记录中抽取 key 和 value。对于每一个输入记录,都有可能产生任意数量(包括 0 个)的 kv 对。框架不会保存任何跨记录的状态,因此每个记录都可以独立的被处理(即 Mapper 可以进行任意并发的运行)。
  • Reducer MapReduce 框架会拿到 Mapper 输出的 kv 对,通过排序将具有相同 key 的 value 聚集到一块,以迭代器的形式给到 Reducer 函数。reducer 会继续输出一组新的记录(如 URL 的出现频次)。

在网站服务器日志的例子中,我们在第五步还有一个 sort 命令,对所有 URL 按请求频次进行排序。在 MapReduce 中,如果你需要一个额外的(除了 reduce 前的那个排序)排序阶段,可以再实现一个 MapReduce 任务,将其与第一个接起来,即使用第一个 MapReduce 任务的输出作为输入。在这种情形下,第二个 MapReduce 任务的 Mapper 会将数据整理成适合排序的形式(将用于排序的字段抽取出来放到 key 中),然后 Reducer 对排好序的数据进行处理。

MapReduce 的分布式执行

与 Unix 工具流水线的相比,MapReduce 的最大区别在于可以在多台机器上进行分布式的执行,但并不需要用户显式地写处理并行的代码。mapper 和 Reducer 函数每次只处理一个记录;他们不必关心输入从哪里来,输出要到哪里去,框架会处理分布式系统所带来的的复杂度(如在机器间移动数据的)。

虽然可以使用 Unix 工具作为分布式计算中的 Mapper 和 reducer,但更为常见的是使用通用编程语言的函数来实现这两个回调。在 Hadoop MapReduce 中,mapper 和 Reducer 是需要实现特殊接口的类(本质上只需要一个函数,因为不需要保存状态。但在 Java 老版本中,函数不是一等公民,所以需要一个类来包裹);在 MongoDB 和 CouchDB 中,mapper 和 Reducer 是 JavaScript 函数。

图 10-1 中展示了 Hadoop MapReduce 任务中的数据流。其并行是基于分片的的:任务的输入通常是 HDFS 中的一个文件夹,输入文件夹中的每个文件或者文件块是一个可被 Map 子任务(task)处理的分片。

Untitled

每个输入文件通常有数百 M,每个输入通常有多个副本,分散在多个机器上。MapReduce 的调度器(图中没有显示)在调度时,会在这多个副本所在机器上选择一个具有足够内存和 CPU 资源运行该 Mapper 任务的机器,将 map 任务调度过去。这个策略也被称为:将计算调度到数据上。从而省去在网络中拷贝数据的环节,提高了局部性,减少了网络带宽消耗。

多数情况下,应用层的代码通常不会存在于 map 任务调度到的机器上。因此,MapReduce 框架首先会将用户代码(如对于 Java 来说就是 Jar 包)序列化后复制过去。然后在对应机器上,动态加载这些代码,继而执行 map 任务。读取输入文件,逐个解析数据记录(record),传给 Mapper 回调函数执行。每个 Mapper 会产生一组 key-value 对。

reduce 侧的计算也是分片的。对于 MapReduce 任务来说,map 任务的数量,取决于该任务的输入文件数(或者文件 block 数)的数量;但 reduce 任务的多少,可以由用户显式的配置(可以不同于 map 任务的数量)。为了保证所有具有相同 key 的 kv 对被同一个 Reducer 函数处理,框架会使用哈希函数,将所有 Mapper 的输出的 kv 对进行分桶(桶的数量就是 Reducer 的数量),进而路由到对应的 Reducer 函数。

根据 MapReduce 的设定,reducer 接受的 kv 对需要是有序的,但任何传统的排序算法都无法在单机上对如此大尺度的数据进行排序。为了解决这个问题,mapper 和 Reducer 间的排序被分成多个阶段

首先,每个 map 任务在输出时,会先将所有输出哈希后分片(一个分片对应一个 reducer),然后在每个分片内对输出进行排序。由于每个分片的数据量仍然可能很大,因此使用外排算法,类似于 SSTable 和 LSM-Trees 一节中讨论的 append+compact 算法。

当某个 Mapper 任务读取结束,并将输出排好了序,MapReduce 调度器就会通知所有 reducers 来该 Mapper 机器上拉取各自对应的输出。最终,每个 Reducer 会去所有 Mapper 上拉取一遍其对应分片数据数据。这里有个推还是拉的设计权衡,拉的好处在于 reducuer 失败后,可以很方便地进行重试,再次拉取计算即可。

这个分片(partitioning by reducer)-排序(sorting)-复制(coping)过程也被称为数据重排shuffle,虽然英文是洗牌的意思,但该过程并没有任何随机性,都是确定的)。

框架会在 Reducer 处将所有从 Mapper 处拿来的 kv 文件进行归并排序,然后在所有数据拉取完毕后,将排好序数据送给 reducer。这样一来,不同 Mapper 产生的具有相同 key 的记录就会被聚集到一块。

总结来说,map 和 reduce 间的排序分为两个阶段

  1. 在每个 Mapper 上对输出分片后各自排序。
  2. 在每个 Reducer 上对输入(有序文件)进行归并排序。

Reducer 在调用时会传入一个 key 一个 Iterator(迭代器),使用该迭代器能够访问所有具有相同 key 的记录(极端情况下,内存可能放不下这些记录,因此是给一个迭代器,而非内存数组)。reducer 函数可以使用任意的逻辑对这些记录进行处理,并可以产生任意数量的输出。这些输出最终会被写到分布式文件系统中的文件里(通常该输出文件会在 Reducer 机器上放一个副本,在另外一些机器上放其他副本)。

MapReduce 工作流

使用单个 MapReduce 任务能够解决的问题十分有限。回到本章一开始提到的日志分析的例子中,仅使用一个 MapReduce 任务可以算出每个 URL 被访问的次数,但无法从其中挑出最受欢迎的几个,因为后者还需要一轮额外的排序。

因此,将多个 MapReduce 首尾相接(前面任务的输出作为后面任务的输入)地串成工作流(workflow)极为常见。Hadoop MapReduce 框架本身没有提供任何关于工作流的支持,因此通常依赖文件夹名进行隐式的链式调用

  1. 第一个 MapReduce 任务将其输出写入特定的文件夹。
  2. 第二个 MapReduce 任务读取这些文件夹中文件作为输入。

但从 MapReduce 框架的角度来看,这是两个独立的任务。

因此,链式调用的 MapReduce 任务不太像多个 Unix 命令组成的流水线(仅使用一小段缓冲区,就可以将数据流从一个命令的输出引向另一个命令的输入),而更像一组以文件为媒介进行链式调用的命令,即前一个命令将输出写入中间文件,后一个命令读取该文件作为输入。这种方式有利有弊,我们将会在“对中间结果进行物化”一节进行讨论。

仅当一个任务完全成功的执行后,其输出才被认为是有效的(也即,MapReduce 任务会丢掉失败任务的不完整输出)。因此,工作流中的任务只有在前一个任务成功结束后才能启动——即,前驱任务必须成功地将输出写入到对应文件夹中。为了处理多个任务间执行的依赖关系(比如 DAG 依赖),人们开发了很多针对 Hadoop的工作流调度框架,如 Oozie,Azkaban,Luigi,Airflow 和 Pinball。

在需要调度的任务非常多时,这些工作流管理框架非常有用。在构建推荐系统时,一个包含 50 到 100 个 MapReduce 的工作流非常常见。此外,在大型组织中,不同团队的任务相互依赖非常常见。在这些复杂的工作流场景中,借助工具十分必要。

很多基于 Hadoop 的高维工具:如 Pig,Hive,Cascading,Crunch 和 FlumeJava,也会对一组 MapReduce 任务,按照合适的数据流走向,进行自动地组合。

Reduce 侧的 Join 和 Group

我们在第二章的数据模型和查询语言的上下文中讨论过连接(Join),但并没有讨论 Join 是如何实现的,现在我们重新捡起这个话头。

在很多数据集中,一个记录和其他记录有关联(association)是一个很常见的现象:关系模型中的外键(foreign key),文档模型中的文档引用(document reference),图模型中的(edge)。在代码需要访问有关联的双方记录(引用记录和被引用记录)时,Join 是必须的。正如第二章所讨论的,denormalization 能够减少 Join 的需要,但并不能完全移除。

在数据库中,如果你只在少量的数据记录上进行查询,则数据库通常会使用索引来快速定位相关的数据记录。如果查询涉及 join,则需要对多个索引进行查找。然而,在 MapReduce 中并没有索引的概念——至少原始版本的 MapReduce 没有。

当一个 MapReduce 任务拿到一组输入文件时,会读取文件中的所有内容;在数据库中,这种操作称为全表扫描(full table scan)。如果你进项访问一小部分记录,相比索引查找,全表扫描操作会非常的重。然而,在分析型查询中(参见事务型还是分析型),针对一个非常大的数据集进行聚集性运算非常常见。在这种场景下,全盘扫描所有的输入还算是说的过去,尤其是你能在多机并行地进行处理。

本书中讨论的 join 多是最常见的 join 类型——等值 join(equi-joins),即有关联的两个记录在某个列(如 ID)上具有相同的值。有些数据库支持更一般化的 Join,如外连接,左外连接,右外连接,这里不展开讨论了。

当在批处理的上下文中讨论 Join 时,我们是想找到所有相关联的记录,而不仅仅是某一些记录。比如,我们会假定一个任务会同时针对所有用户进行处理,而不是仅仅查找某个特定用户的数据(特定用户的话,使用索引肯定更为高效)。

例子:用户行为数据分析

在批处理任务中,一个典型的任务如图 10-2 所示。左侧是在网站中记录的登录用户的行为事件(称为 activity events 或者 clickstream data),右侧是数据库中的用户信息。你可以认为这是运用星型建模(参见AP 建模:星状型和雪花型)的一个例子:事件日志是事实表,数据库中的用户表是其中的一个维度。

Untitled

一个分析任务通常会协同处理用户行为用户资料:如,如果用户资料中包含用户的年龄或生日信息,则可以分析出哪个网页在那个年龄段最受欢迎。但是,用户行为事件中一般只会包含用户的 ID,而非所有用户资料信息。将每个用户的资料信息都嵌入到每个用户事件中进行输出,会使冗余度变得非常高。因此,需要将行为事件和用户资料进行 Join 。

最简单的方法,是对行为事件中的每一个事件所包含的用户 ID,都去用户数据库中(存在远程服务器上)进行一次查询。这种方法虽然可行,但性能极差::

  1. 不做任何优化,则数据处理带宽会受制于与用户数据库通信开销
  2. 如果使用缓存,本地缓存的有效性受制于行为事件数据中用户 ID 的分布
  3. 如果使用并发,则大量的并发查询很可能把数据库打垮

在批处理中为了获取足够好的性能,需要把计算尽可能的限制在机器本地。如果对于某个待处理记录,都要进行随机的网络访问,性能将会非常差。此外,不断地查询远程数据库也会导致数据库处理的不确定性,毕竟在你的多次查询间,数据库的数据可能会发生变化。

因此,一个更好的方式是将所需数据库数据的一个副本拿到(如,从数据库的备份中通过 ETL ,Extract-Transform-Loading 的方式导入)用户行为数据所在的分布式文件系统。于是,用户资料在 HDFS 中的一些文件中,用户行文在 HDFS 的另外一些文件中,此时就可以使用 MapReduce 任务来关联两者,进行分析。

基于排序-合并的 Join

让我们回顾下 Mapper 的职责:从所有输入记录中提取 key 和 value。对于 10-2 中的例子来说,key 就是用户 ID:我们使用一些 mappers 从用户行为事件中提取用户 ID 和网页,使用另一些 mappers (两组 mappers 从属于同一个 MapReduce 任务)从用户资料信息中提取其他信息(如用户 ID 作为 key,用户生日作为 value)。如下图:

Reduce 会按 key 进行排序,自动将多个源 Join

当 MapReduce 框架将所有 Mapper 的输出按照 key(也就是用户 ID)进行排序后,所有具有同样的用户 ID 的记录就会聚集到一块,作为输入给到 reducer。MapReduce 任务甚至可以将输出进行特殊组织,以使 Reducer 先看到同一个用户的资料信息,再看到其行为信息——这种技术也被称为二级排序(secondary sort,使用多个字段进行排序)。

在此基础上,reducer 可以进行轻松的进行 join:reducer 函数会在每一个用户 ID 上进行调用,由于使用了二级排序,reducer 会先看到该用户的资料信息。在实现 Reducer 时,可以首先将用户资料信息(比如生日)保存在局部变量里,然后对其所有行为信息进行迭代,提取相关信息,输出 <viewed-url, viewed age in years> kv 对。之后可以再接一个 MapReduce 任务,对每个 url 访问用户的年龄分布进行统计,并按年龄段进行聚集。

由于 Reducer 会在单个函数里处理所有同一个 user ID 的记录,因此一次只需要在内存中保存一个用户的资料信息,并且不用进行任何网络请求。这种算法也被称为基于排序和归并的连接(sort-merge join),由于 Mapper 的输出是按 key 有序的,则 reducers 可将来自多方的同一个 key 的输入轻松的进行合并。

将相关数据聚到一块

在排序-归并 join 中,mappers 和排序会确保同一个用户 id 所有用于 join 必要输入会被放到一起:即作为一个输入给到某次 Reducer 中。预先让所有相关数据聚集到一起,可以让 Reducer 逻辑非常简单,并且可以仅使用单个线程,就能进行高吞吐、低耗存地执行。

我们可以从另外一种角度来理解这种架构:mapper 发消息给 reducer。当某个 Mapper 发出一个 key-value 对时,key 是投递地址,value 就是要投递的内容。尽管 key 在物理上仅是一个任意的字符串(而非像网络中的 IP 和端口号那样真的网络地址),但在逻辑上充当地址的作用:所有具有相同 key 的 kv 对都会被投递到同一个目的地(某个 Reducer 的调用处)。

MapReduce 编程模型,可以将计算的物理拓扑(将数据放到合适的机器上)与应用逻辑(当有了数据后就进行处理)解耦开来。这种解耦与数据库形成对比——在使用数据库的场景中,进行数据库连接(物理)通常藏在应用代码(逻辑)深处。由于 MapReduce 框架会处理所有网络通信细节,它也会让应用层代码免于关心部分失败(partial failure),如某些节点宕机:MapReduce 框架会透明的(应用代码无感)的对失败的子任务进行重试,而不会影响应用逻辑。

Group By

除了 joins,另外一种“将相关数据聚到一块”(bring related data to the same place)模式经典用法是,将所有记录按某些 key 进行分组(对应 SQL 中的 GROUP BY 子句)。首先将具有相同 key 的所有记录被分到一组,然后对这些分组分别执行某些聚集操作(aggregation),例如:

  • 统计每个分组中的记录数(如上例中的页面访问数,在 SQL 中对应 COUNT(*)
  • 将某个字段进行累加(在 SQL 中对应 SUM(fieldname)
  • 根据排序函数排序后取前 k 个记录

使用 MapReduce 实现 Group By 语义,最简单的方法是在 Mapper 中抽取 key 为待分组的 key。MapReduce 框架就会按照这些 key 将所有 Mapper 的输出记录进行分区和排序,然后按 key 聚集给到 reducer。本质上,使用 MapReduce 来实现 group 和 join ,逻辑是极为相似的。

分组的另外一个使用场景是:收集某个用户会话中的所有用户活动——也称为会话化(sessionization)。例如,可以用来对比用户对于新老版本网站的分别购买意愿(A/B 测试)或者统计某些市场推广活动是否起作用。

假设你的 web 服务架设在多台服务器上,则某个特定用户的活动日志大概率会分散在不同服务器上。这时,你可以实现一个会话化的 MapReduce 程序,使用会话 cookie、用户 ID或者其他类似的 ID 作为分组 key,以将相同用户的所有活动记录聚集到一块、并将不同用户分散到多个分区进行处理。

处理偏斜(skew)

如果某个 key 的数据量超级大,则“将相同 key 的数据聚集到一块” 的模型将不再适用。例如,在社交网络中,绝大多数的人都只会连接到较少的其他人,但数量较少的名人会有高达数百万的关注者。数据库中这种不成比例的记录常被称为关键对象linchpin objects )或者热键hot keys)。

在单个 Reducer 中收集处理名人(celebrity)所有的活动事件(比如他们发布信息的回复),可能会造成严重的数据倾斜skew,有时也被称为热点,hot spots)——即,一个 Reducer 处理的数据量远超其他(参见负载偏斜和热点消除)。由于只有其所属的所有 mappers 和 reducers 执行完时,该 MapReduce 任务才算完成,该 MapReduce 之后的任何任务都需要等待最慢的 Reducer (长尾任务)完成后才能启动。

如果某个 join 的输入存在热点数据,你可以借助一些算法来进行缓解。例如,Pig 中的偏斜 join(skewed join)方法会事先对所有 key 的分布进行采样,以探测是否有热点 key。然后,在执行真正的 Join 时,对于 Join 有热点 key 的这一测,mapper 会将含有热点 key 的记录发送到多个 reducer(每次随机挑选一个,相比之下,常规的 MapReduce 只会根据 key 的哈希确定性的选择一个 reducer);对于 Join 的另一侧输入,所有包含热点 key 的相关记录需要每个给每个具有该 key 的 Reducer 都发一份。

该技术将处理热点 key 的工作分摊到多个 Reducer 上,从而可以让其更好的并行,当然代价就是需要将 join 的非热点侧的数据冗余多份。Crunch 中的分片连接(shared join)也使用类似的技术,但需要显式地指定热点 key,而非通过采样来自动获取。这种技术很像我们在负载偏斜和热点消除中讨论过的相关技术,在多分片数据中,使用随机分片的方法来消除热点。

Hive 的偏斜连接(skewed join)采用了另外一种方法来进行优化。Hive 要求在表的元信息中显示的指出热点 key,在收到这些 key 时会将其存到单独文件中。在对这种表进行 join 时,会使用 map 侧的 join(见下一小节)来处理热点 key。

当对热点 key 进行分组聚集(group)时,可以将分组过程拆成两个阶段,即使用两个相接的 MapReduce。第一个 MapReduce 会将记录随机得发给不同的 reducer,则每个 Reducer 会对热点 key 的一个子集执行分组操作,并且产生一个更为紧凑的聚合值(aggregated value,如 count,sum,max 等等)。第二个 MapReduce 操作会将第一阶段中 MapReduce 产生的同一个 key 的多个聚合值进行真正的归并。总结来说,就是第一阶段进行预分组,减小数据量;第二阶段真正的全局分组,可以想象这种方式,要求聚合操作满足交换律和结合律

Map 侧的连接

上一节讲到的 join 算法是在 reduce 阶段真正执行的 join 逻辑,因此也被称为 reduce 侧连接reduce-side join)。其中,mapper 仅扮演准备数据的角色:从每个输入记录中提取 key 和 value,并且将每个 kv 对发给合适的 Reducer 分区,并将其进行排序。

reduce 侧的连接的好处在于,你不需要对输入数据有任何的假设:不管输入数据具有怎样的属性和结构,mappers 都可以进行合适的预处理后送给 reducers 进行连接。然而,缺点在于排序、复制到 reducers、将 Reducer 的输入进行合并等过程代价十分高昂。根据可用内存缓存大小不同,数据在流经 MapReduce 中各阶段时可能会被写入多次(写放大)。

但如果,输入数据满足某种假设,就可以利用所谓的 map 侧连接(map-side join)进行更快的连接。这种方式利用了一种简化过的 MapReduce 任务,去掉了 reducer,从而也去掉了对 Mapper 输出的排序阶段。此时,每个 Mapper 只需要从分布式文件系统中的输入文件块中读取记录、处理、并将输出写回到文件系统,即可。

广播哈希连接

使用 map 侧连接的一个最常见的场景是一个大数据集和一个小数据集进行连接时。此种情况下,小数据集需要小到能全部装进 Mapper 进程所在机器的内存。

如,设想在图 10-2 对应的场景中,用户资料数据足够小,能够装入内存。在这种情况下,当 Mapper 启动时,可以先将用户资料分布式文件系统中读取到内存的哈希表中。一旦加载完毕,mapper 可以扫描所有的用户活动事件,对于每一个事件,在内存哈希表中查找该事件对应用户资料信息,然后连接后,输出一条数据即可。

但仍然会有多个 Mapper 任务:join 的大数据量输入侧(在 10-2 中,用户活动事件表是大输入侧)每个文件块一个 mapper。其中 MapReduce 任务中的每个 Mapper 都会将小输入侧的数据全部加载进内存。

这种简单高效的算法称为广播哈希连接(broadcast hash joins):

  1. 广播(broadcast):处理大数据侧每个分片的 Mapper 都会将小数据侧数据全部载入内存。从另外一个角度理解,就是将小数据集广播到了所有相关 Mapper 机器上。
  2. 哈希(hash):即在将小数据集在内存中组织为哈希表。

Pig(replicated join)、Hive(MapJoin)、Cascading 和 Crunch 都支持这种连接方法。

分区哈希连接

如果待 join 的多个输入,能够以同样的方式进行分区,则每个分区在处理时可以独立地进行 join。仍以图 10-2 为例,你可以重新组织活动事件和用户信息,都将其按用户 ID 的最后一位进行分片(则每侧输入都会有十个分片)。例如,mapper-3 首先将具有以 3 结尾的 ID 的用户资料数据加载到内存哈希表中,然后扫描所有以 3 结尾的 ID 活动事件记录,进行连接。

如果分区方式正确,则所有需要连接的双方都会落到同一个分区内,因此每个 Mapper 只需要读取一个分区就可以获取待连接双方的所有记录。这样做的好处是,每个 Mapper 所需构建哈希表的数据集要小很多(毕竟被 partition 过了)。

只有当 join 的连接输入:

  1. 具有同样的分区数目
  2. 使用同样的 key 进行分区
  3. 以同样的哈希函数进行分区

如果之前的 MapReduce 已经做了上述分组,则按分区进行连接就顺其自然。

在 Hive 中,分区哈希连接也被称为分桶 map 侧连接( bucketed map join)。

Map 侧合并连接

map 侧连接的另一个变种是,当 map 的输入数据集不仅以相同的方式分片过了,而且每个分片是按该 key 有序的。在这种情况下,是否有足够小的、能够载入内存的输入已经无关紧要,因为 Mapper 可以以类似普通 Reducer 的方式对输入数据进行归并:都以 key 递增(都递减也可以,取决于输入文件中 key 的顺序)的顺序,增量式(迭代式)的读取两个输入文件,对相同的 key 进行匹配连接(比如对每个输入使用一个指针,进行滑动匹配即可)。

如果我们可以在 map 侧进行归并连接,说明前一个 MapReduce 的输入已经将文件分好了组、排好了序。原则上,在这种情况下,join 完全可以在前一个 MapReduce 的 reduce 阶段来做。然而,使用额外的一个 MapOnly 任务来做连接也是有适用场景的,比如分区且有序的文件集还可以被其他任务复用时。

使用 map 侧连接的 MapReduce 工作流

当 MapReduce 连接的输出为下游任务所用时,会因为 map 侧还是 reduce 侧的不同而影响输出结构。使用 reduce 侧的连接时,输出会按照 join key 进行分区和排序;使用 map 侧的连接时,输出会和较大数据量输入侧顺序一致(不论是使用 partitioned join 还是 broadcast join,对于大数据量侧,都是一个文件块对应一个 map 子任务)。

如前所述,使用 map 侧的 join 时,会对输入数据集的尺寸、有序性和分区性有更多的要求。事先知道输入数据集在分布式文件系统上的分布对优化 join 策略至关重要:只是知道文件的编码格式和文件是否有序是不够的;你必须进一步知道输入文件的分区数量,以及文件中的数据是按哪个字段进行分区和排序的。

在 Hadoop 生态中,这些如何分区的元信息通常会在 HCatalog 和 Hive 元信息存储中维护

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-12-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 木鸟杂记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MapReduce 任务执行
    • MapReduce 的分布式执行
      • MapReduce 工作流
      • Reduce 侧的 Join 和 Group
        • 例子:用户行为数据分析
          • 基于排序-合并的 Join
            • 将相关数据聚到一块
              • Group By
                • 处理偏斜(skew)
                • Map 侧的连接
                  • 广播哈希连接
                    • 分区哈希连接
                      • Map 侧合并连接
                        • 使用 map 侧连接的 MapReduce 工作流
                        相关产品与服务
                        云 HDFS
                        云 HDFS(Cloud HDFS,CHDFS)为您提供标准 HDFS 访问协议,您无需更改现有代码,即可使用高可用、高可靠、多维度安全、分层命名空间的分布式文件系统。 只需几分钟,您就可以在云端创建和挂载 CHDFS,来实现您大数据存储需求。随着业务需求的变化,您可以实时扩展或缩减存储资源,CHDFS 存储空间无上限,满足您海量大数据存储与分析业务需求。此外,通过 CHDFS,您可以实现计算与存储分离,极大发挥计算资源灵活性,同时实现存储数据永久保存,降低您大数据分析资源成本。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档