前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kubernetes中从头开始构建MapReduce

Kubernetes中从头开始构建MapReduce

作者头像
云云众生s
发布2024-05-01 10:19:02
970
发布2024-05-01 10:19:02
举报
文章被收录于专栏:云云众生s云云众生s

译自 MapReduce from Scratch,作者 Michal Pitr。

在过去的几周里,我一直在从头开始构建 MapReduce。

这将是一篇很长的文章:我们将了解分布式计算的必要性,重新发现为什么 MapReduce 是对许多问题进行建模的自然方式,构建我们自己的版本,了解各个部分如何组合在一起,并用它解决一个实际问题!

动机问题

假设我们想计算海量数据集中的词频。我们尝试在单台机器上处理它,但发现需要一个月以上的时间。我们能做什么?

第一个想法应该是获得一台更快的机器,但它可能不存在或太昂贵。相反,让我们看看如何将问题分布到 N 台商品机器上。我们希望有一种简单的方法来使用简单的查询查找任何单词的频率,即 grep over a file。

让我们首先将数据集拆分为 N 个分区,并使用不同的机器计算每个子集的词频。这已经给了我们 Nx 的加速,减去了一些固定的开销。为了合并结果,我们可以添加一台最终机器,它获取这 N 个部分结果文件并对相应单词的频率求和。

这与 MapReduce 背后的核心思想相差不远。上述过程有两个主要步骤:首先,我们将单词映射到输入数据子集中的频率,然后减少中间结果以获得最终答案。

请注意,这是非常通用的,想象一下我们有一个大型照片数据集,我们希望对其进行分类:我们可以将图像分类任务作为映射操作,然后在归约阶段将具有相同类别的图像分组。

另一个观察结果是,映射部分通常是两个部分中更昂贵的阶段,因此,通常映射器比归约器多。

希望已经让你相信 MapReduce 是一个合理的想法,让我们看看 MapReduce 论文如何解决词频问题。

使用 MapReduce

下面,我从 论文中复制粘贴了 WordCounter MapReduce 程序。让我们看看它是如何工作的。稍后,当我们实现我们的版本时,我们的目标是保持使用语义相同。

代码语言:javascript
复制
#include "mapreduce/mapreduce.h"

// User’s map function
class WordCounter : public Mapper
{
public:
    virtual void Map(const MapInput &input)
    {
        const string &text = input.value();
        const int n = text.size();
        for (int i = 0; i < n;)
        {
            // Skip past leading whitespace
            while ((i < n) && isspace(text[i]))
                i++;
            // Find word end
            int start = i;
            while ((i < n) && !isspace(text[i]))
                i++;
            if (start < i)
                Emit(text.substr(start, i - start), "1");
        }
    }
};
REGISTER_MAPPER(WordCounter);

// User’s reduce function
class Adder : public Reducer
{
    virtual void Reduce(ReduceInput *input)
    {
        // Iterate over all entries with the
        // same key and add the values
        int64 value = 0;
        while (!input->done())
        {
            value += StringToInt(input->value());
            input->NextValue();
        }
        // Emit sum for input->key()
        Emit(IntToString(value));
    }
};
REGISTER_REDUCER(Adder);

int main(int argc, char **argv)
{
    ParseCommandLineFlags(argc, argv);
    MapReduceSpecification spec;
    
    // Store list of input files into "spec"
    for (int i = 1; i < argc; i++)
    {
        MapReduceInput *input = spec.add_input();
        input->set_format("text");
        input->set_filepattern(argv[i]);
        input->set_mapper_class("WordCounter");
    }
    
    MapReduceOutput *out = spec.output();
    out->set_filebase("/gfs/test/freq");
    out->set_num_tasks(100);
    out->set_format("text");
    out->set_reducer_class("Adder");
    
    // Tuning parameters: use at most 2000
    // machines and 100 MB of memory per task
    spec.set_machines(2000);
    spec.set_map_megabytes(100);
    spec.set_reduce_megabytes(100);
    
    // Now run it
    MapReduceResult result;
    if (!MapReduce(spec, &result))
        abort();
    // Done: ’result’ structure contains info
    // about counters, time taken, number of
    // machines used, etc.
    return 0;
}

用户程序有 3 部分:map 函数、reduce 函数和配置。大部分繁重的工作由导入的 mapreduce 库处理。

map 函数将输入文本拆分为单词并发出键值对。例如,“the quick brown fox jumps over the lazy dog” 映射到以下键值对:[“the”:1, “quick”:1, “brown”:1, “fox”:1, “jumps”: 1, “over”:1, “the”:1, “lazy”:1, “dog”:1]。请注意,对 (“the”: 1) 发射了两次。

reduce 函数按键操作。例如,给定输入 [“the”:1, “the”:1”, “the”:1],它将发出 (“the”:3)。

配置处理输入输出、格式以及可用于 MapReduce 作业的资源数量。

在不到 100 行代码中,我们可以通过利用 1000 台机器来解决单词计数问题!这非常简洁,展示了 MapReduce 作为一种抽象的强大功能。

收集需求

本文将成为我们的灵感,让我们开始勾画实施要求:

  • 用户导入我们的 MAPREDUCE 库,执行 MAP 和 REDUCE 函数,并提供配置。
  • 当用户执行二进制文件时,它将被复制到多台机器上,在这些机器上以 Mapper 或 Reducer 的模式执行。
  • 这些机器必须有权访问输入数据。
  • Mapper 的输出必须对 Reducer 可用。
  • 用户可以访问最终结果。

基础设施

当我开始研究这项工作时,这些需求提出了两个主要未知:如何将二进制文件分发给其他计算机以及如何向它们提供输入数据。

经过一番研究,我决定在我的输入数据上托管一个网络存储服务器——我选择 NFS。我们可以将网络目录挂载到每台计算机,并允许计算机对该目录进行读写。它可能不像 Google 文件系统或 HDFS 那样具有高性能或高可扩展性,但它可以满足我们的目的。

为了将二进制文件分发到多台计算机,执行并监控其状态,我们可能会通过 SCP 复制二进制文件、通过 SSH 登录到计算机、执行二进制文件,然后等待其完成。我们甚至可以实施运行状况 ping 和进度报告。

不过,在我闲置一段时间后,我意识到这听起来非常像集群编排,并决定利用 Kubernetes 让我的生活更轻松。我们可以将二进制文件构建为 docker 镜像,并将其作为 Kubernetes 任务执行。通过持久卷和持久卷声明,可以轻松地将 NFS 与 Kubernetes 集成。不用担心,您不必理解这些概念,但我提到它们是为了完整起见。

由于我是出于教育目的开发这个概念,所以我使用 minikube 在我的笔记本电脑上启动一个本地虚拟集群。我在集群外将我的 NFS 服务器作为一个 docker 容器运行,并通过 docker 网络将其连接到该集群。此设置的优点是它可以轻松迁移到多个物理或虚拟机。

在基础架构到位的情况下,让我们开始编写我们的MapReduce框架!

使用我的 MapReduce

首先,我们将探讨如何使用我的 MapReduce 实现解决字数统计问题。然后,我们将深入研究该实现,以了解各个组件的功能。

正如我承诺的,我试图保持与原始论文相似的用法语义。使用我的 Go 实现比较此 MapReduce 程序和我们之前看到的 C++ 示例!

代码语言:javascript
复制
// main.go

import (
  …

  "github.com/MichalPitr/map_reduce/pkg/config"
  "github.com/MichalPitr/map_reduce/pkg/interfaces"
  "github.com/MichalPitr/map_reduce/pkg/mapreduce"
)

type WordCounter struct {
  wordRegex *regexp.Regexp
}

func (wc *WordCounter) Map(input interfaces.MapInput, emit func(key, value string)) {
  text := input.Value()
  text = strings.ToLower(text)
  words := wc.wordRegex.FindAllString(text, -1)
  for _, word := range words {
    emit(word, "1")
  }
}

type Adder struct{}

func (a *Adder) Reduce(input interfaces.ReducerInput, emit func(value string)) {
  val := 0
  for !input.Done() {
    num, err := strconv.Atoi(input.Value())
    if err != nil {
      log.Printf("Failed converting input to integer, skipping: %q", input.Value())
      input.NextValue()
      continue
    }
  val += num
  input.NextValue()
  }
  emit(strconv.Itoa(val))
}

func main() {
  cfg := config.SetupJobConfig()
  log.Printf("cfg: %+v", cfg)
  cfg.NumReducers = 2
  cfg.NumMappers = 4


  cfg.Mapper = &WordCounter{wordRegex: regexp.MustCompile(`\b\w+\b`)}
  cfg.Reducer = &Adder{}

  mapreduce.Execute(cfg)
}

让我们花点时间理解一下我的解决方案是如何在幕后工作的。我们可以参考该论文中一张方便的图表来说明这一点。

用户的程序在三个模式下执行,分别是 Master、Mapper 和 Reducer,在不同的机器上执行。在高层次上,master 处理整个作业编排,mappers 对输入文件执行昂贵的 map 操作,reducers 联合来自 mappers 的中间结果。在我的解决方案中,模式由 CLI 标志决定,例如 --mode=master 代表 master 模式。

Master

主模式将输入文件分割成子集、准备 NFS 目录、启动带有已分配文件的映射器作业,并等待它们完成。然后,针对还原器重复此过程。主函数非常简单,我可以将其列出如下:

代码语言:javascript
复制
// /pkg/master/master.go

func Run(cfg *config.Config) {
  clientset := createKubernetesClient()
  numNodes := getNumberOfNodes(clientset)
  mustValidateConfig(cfg, numNodes)

  jobId := fmt.Sprintf("job-%s", time.Now().Format("2006-01-02-15-04-05"))
  log.Printf("Running master: %s", jobId)
	
  mustCreateJobDir(cfg.NfsPath, jobId)
  fileRanges := partitionInputFiles(cfg.InputDir, cfg.NumMappers)

  t0 := time.Now()
  launchMappers(cfg, clientset, jobId, fileRanges)
  waitForJobsToComplete(clientset, jobId, "mapper")
  log.Printf("Mappers took %v to finish", time.Since(t0))

  t1 := time.Now()
  launchReducers(cfg, clientset, jobId)
  waitForJobsToComplete(clientset, jobId, "reducer")
  log.Printf("Reducers took %v to finish", time.Since(t1))
  log.Printf("Total runtime: %v", time.Since(t0))
}

让我们来理解 launchMappers 函数。它为每个映射器创建 Kubernetes 作业。作业规范指定:

  • 包含我们的二进制文件的 Docker 映像。
  • mapper 必需的 CLI 参数:mapper 模式、输入/输出目录和要处理的文件。
  • 如何装载 NFS 存储。

等等……Docker镜像?我们从未创建过!当我们最终运行我们的解决方案时我会更详细地展示这一点,但 Kubernetes 围绕容器,所以我们对我们的二进制文件进行 docker 化,以便 Kubernetes pod 可以下载该二进制文件。

在 launchMappers 创建 Mappers 作业后,Kubernetes会将作业分配给群集内的节点,启动容器,并监视其状态。现在我们可以运行 Mappers 了,是时候了解它们的工作原理了!

Mappers

在 map 模式中,二进制文件逐行扫描输入文件,并将该行传递给用户的 map 函数。Map 处理文本,并使用注入的 emit 函数发出键值对。

在我的实现中,emit 函数非常简单 - 它将键值对存储在一个动态列表的 map 中。更完整的实现将进行一些缓冲,然后将缓冲区刷新到磁盘。

当 mapper 完成所有输入的处理后,它将已排序的键值对保存到 NFS 存储中的中间文件中,reducer 将从该中间文件中读取这些键值对进行最终处理。让我们在这里缩小视野,看看从中间文件到 reducer 的这种映射如何工作。

我们希望按照键为中间文件分区,这样所有相同的键都由一个 reduce 任务处理。例如,”brown“和“the”出现在 Mapper 1 和 Mapper 2 中间文件。我们希望确保每个最终由同一个 reduce 任务处理。我们来看看一个例子。

为了实现这一点,当保存 mappers 中的中间结果时,我们根据 reducers R 的数量使用公式对键进行分区

例如,使用 FNV 哈希和 R = 2,我们得到

(数学笔记:这可以解释为“1 与 FNV(brown) mod 2 同余”)。

基于该内容,我们将“brown”分配到第二个中间文件,该文件将由 reducer 2 处理。请注意在上面的示例中,所有“the”都落入蓝色文件中,而所有“brown”都落入红色文件中!

这总结了关于 mappers 的讨论——接下来,让我们看看 reducer 如何工作。

Reducers

如下先前所突出显示,还原程序的工作是从分配的中介文件中读取键值对,然后使用用户定义的还原函数来处理它们。

有两件事我们可以确信:中介文件中的键按键排序,如果某中介文件中存在键 A,则我们可以保证键 A 不会出现在分配给其他 reducers 程序的任何文件中。

我尝试以更聪明的方式实现我的 reducers 程序,并避免将所有中介文件加载到内存中。这会带来一个有趣的算法问题:

假设我们要处理 3 个中介文件,一次处理一个键值对,而无需将所有内容加载到内存中。

我们可以借助最小堆即时合并键值对!我们将来自每个文件的第一个键值对加载到堆中。每当我们从堆中弹出数据时,我们从对应文件中读取下一行并将其推入堆中。这为我们提供了一种高效内存读取键值对流的方式!你可以在此处找到实现。

处理完所有中间文件后,Reducer 将结果保存在 NFS 存储中。

例如,上述示例将产生: [“abby”: 1, “alice”: 3, “bob”: 2, “car”:2, “dog”:1, …]。

MapReduce 论文提出了我已经在我的实现中跳过的几个额外的优化。聪明的读者可能已经能够提出一些优化 - 例如,我们可以在 mapper 中选择性地进行一些缩减,不是吗?

恭喜你,你现在对 MapReduce 有一个非常透彻的了解!最困难的部分已经结束了!让我们将它们组合在一起,并使用我们的 MapReduce 来统计 Project Gutenberg 上最流行的 100 本图书中的单词频率!

MapReduce 实战

我下载了 Project Gutenberg 上最流行的 100 本书到 /mnt/nfs/input/,使用的是这个脚本。书本的文件被标记为 book-0 到 book-99。我还创建了一个可以访问我的 NFS 存储的 4 节点 minikube 集群。

代码语言:javascript
复制
michal@michal-ThinkPad-T490s:/mnt/nfs$ kubectl get nodes
NAME            STATUS   ROLES           AGE    VERSION
multinode       Ready    control-plane   27d    v1.28.3
multinode-m02   Ready    <none>          7d2h   v1.28.3
multinode-m04   Ready    <none>          7d1h   v1.28.3
multinode-m05   Ready    <none>          7d1h   v1.28.3

我将重用前面展示的 Word-Frequencies Go 主文件。我们必须使用此 Dockerfile 准备 Docker 镜像,并将其推送到我的注册表。mapper 和 reducer 节点将提取此镜像来运行我们的工作负载。

代码语言:javascript
复制
docker build -t michalpitr/mapreduce:latest .
docker push michalpitr/mapreduce:latest

现在我们只需要在本地运行 master!

代码语言:javascript
复制
go build .
./map_reduce --mode=master --image=michalpitr/mapreduce:latest --input-dir /mnt/input/ --nfs-path /mnt/nfs/

让我们检查执行日志。

代码语言:javascript
复制
michal@michal-ThinkPad-T490s:~/code/map_reduce$ ./map_reduce --mode=ma
ster --image=michalpitr/mapreduce:latest --input-dir /mnt/nfs/input/ -
-nfs-path /mnt/nfs/
2024/04/28 17:13:53 Running master job-2024-04-28-17-13-53
2024/04/28 17:13:53 Creating mapper-0 for book-0-30
2024/04/28 17:13:53 Creating mapper-1 for book-31-53
2024/04/28 17:13:53 Creating mapper-2 for book-54-76
2024/04/28 17:13:53 Creating mapper-3 for book-77-99
2024/04/28 17:13:53 Waiting for jobs to finish.
2024/04/28 17:14:03 Waiting for jobs to finish.
2024/04/28 17:14:13 Waiting for jobs to finish.
2024/04/28 17:14:23 All jobs completed.
2024/04/28 17:14:23 Mappers took 30.1843218s to finish
2024/04/28 17:14:23 Creating reducer-0
2024/04/28 17:14:23 Creating reducer-1
2024/04/28 17:14:23 Waiting for jobs to finish.
2024/04/28 17:14:33 All jobs completed.
2024/04/28 17:14:33 Reducers took 10.106014703s to finish
2024/04/28 17:14:33 Total runtime: 40.290376921s

整个作业花费了 40 秒。我们在此处并未期望出现任何疯狂的情况,毕竟,我是在一台动力不足的笔记本电脑上运行此作业,而不是一个真正的集群。

让我们简单了解一下。master 创建了 4 个 mappers,每个 mappers 都分配了一个书籍范围。

使用kubectl,我们可以看到集群正在创建 4 个容器。这是集群拉取我们之前推送的镜像的时候!

代码语言:javascript
复制
michal@michal-ThinkPad-T490s:/mnt/nfs$ kubectl get pods
NAME             READY   STATUS              RESTARTS   AGE
mapper-0-zgz24   0/1     ContainerCreating   0          11s
mapper-1-m824k   0/1     ContainerCreating   0          11s
mapper-2-ctqqt   0/1     ContainerCreating   0          11s
mapper-3-l2ddh   0/1     ContainerCreating   0          11s

这些容器启动并完成运行所需的时间并不长。

代码语言:javascript
复制
michal@michal-ThinkPad-T490s:/mnt/nfs$ kubectl get pods
NAME             READY   STATUS      RESTARTS   AGE
mapper-0-zgz24   1/1     Running     0          18s
mapper-1-m824k   0/1     Completed   0          18s
mapper-2-ctqqt   0/1     Completed   0          18s
mapper-3-l2ddh   0/1     Completed   0          18s

master 定期轮询 Kubernetes API 服务器以获取这些作业的状态。一旦所有作业都完成,它就会启动归约器。

在进入归约器之前,让我们看看其中一个映射器生成的中间文件:

代码语言:javascript
复制
michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53/mapper-3$ ls
partition-0  partition-1

检查分区-1 的末尾,显示了陀思妥耶夫斯基的《地下室手记》中一个角色的键值对。到目前为止,一切都好!

代码语言:javascript
复制
michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53/mapper-3$ tail partition-1
zverkov,1
zverkov,1
zverkov,1
zverkov,1
zverkov,1
…

接下来,master 启动归约器,

代码语言:javascript
复制
michal@michal-ThinkPad-T490s:/mnt/nfs$ kubectl get pods
NAME              READY   STATUS              RESTARTS   AGE
…
reducer-0-4xt58   0/1     ContainerCreating   0          2s
reducer-1-xvhfk   0/1     ContainerCreating   0          2s

它们很快完成。

代码语言:javascript
复制
michal@michal-ThinkPad-T490s:/mnt/nfs$ kubectl get pods
NAME              READY   STATUS      RESTARTS   AGE
…
reducer-0-4xt58   0/1     Completed   0          7s
reducer-1-xvhfk   0/1     Completed   0          7s

就是这样 - 我们刚刚使用我们自己的 MapReduce 框架计算了单词频率!让我们看看我们可以从结果中学到什么!

我可以在作业文件夹的根目录中看到输出文件:

代码语言:javascript
复制
michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ ls
…  reducer-0  reducer-1

让我们使用 grep 来查找一些单词的频率。

代码语言:javascript
复制
michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ grep -w zverkov reducer-0 reducer-1
reducer-1:zverkov,112

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ grep -w the reducer-0 reducer-1
reducer-1:the,822175

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ grep -w mapreduce reducer-0 reducer-1
michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ 

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ grep -w map reducer-0 reducer-1
reducer-1:map,188

michal@michal-ThinkPad-T490s:/mnt/nfs/job-2024-04-28-17-13-53$ grep -w reduce reducer-0 reducer-1
reducer-0:reduce,123

遗憾的是,在古腾堡计划最流行的 100 本书中没有提到 MapReduce……也许有一天?

最后一点,请注意这些输出文件如何按键对结果空间进行分区。当我们查找一个单词时,它只存在于一个文件中!这几乎就像我们做对了什么!

如果您已经读到这里,您不妨查看GitHub 仓库。您已经熟悉SQLite 存储格式内部。研究和撰写这些文章需要大量时间和精力。为确保您不会错过下一篇文章,请考虑订阅或关注我。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2024-04-302,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 动机问题
  • 使用 MapReduce
  • 收集需求
  • 基础设施
  • 使用我的 MapReduce
  • Master
  • Mappers
  • Reducers
  • MapReduce 实战
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档