专栏首页容器计算Apache Beam的Docker Demo

Apache Beam的Docker Demo

1 Overview

参考文章: https://medium.com/@0x0ece/a-quick-demo-of-apache-beam-with-docker-da98b99a502a

Apache Beam 是什么?

Apache Beam 是统一的批/流数据处理的编程模型。本文主要是参考官方文档,用 Docker 来快速跑起来一个用 Beam 来构建的 Flink 程序来处理数据的 Demo。

2 Docker 部署 Flink & Beam

首先利用 Docker Compose 来将 Flink Cluster 跑起来。

git clone https://github.com/ecesena/docker-beam-flink.git
cd docker-beam-flink

然后大家可以看看文件夹的树状结构。

➜  docker-beam-flink git:(master) tree
.
├── LICENSE
├── README.md
├── base
│   ├── Dockerfile
│   └── supervisor.conf
├── beam-flink
│   ├── Dockerfile
│   └── config-flink-load-jar.sh
├── build.sh
├── docker-compose.yml
├── flink
│   ├── Dockerfile
│   ├── conf
│   │   ├── flink-conf.yaml
│   │   ├── log4j.properties
│   │   ├── logback-yarn.xml
│   │   ├── logback.xml
│   │   └── slaves
│   └── config-flink.sh
└── screenshots
    └── showplan.png

从文件结构看,项目中包含了三个 Dockerfile,其依赖的顺序可以是 base/Dockerfile -> flink/Dockerfile -> beam-flink/Dockerfile。

base 中的 Dockerfile 是 Ubuntu 的基础镜像,这里就不分析了。剩下的逐一分析一下,分析写在里 Dockerfile 里。

flink

FROM base

# add passless key to ssh
RUN ssh-keygen -f ~/.ssh/id_rsa -t rsa -N ''
RUN cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys && chmod 600 ~/.ssh/*

## 安装 Flink 1.0.3 
RUN mkdir ~/downloads && cd ~/downloads && \
    wget -q -O - http://apache.mirrors.pair.com/flink/flink-1.0.3/flink-1.0.3-bin-hadoop26-scala_2.10.tgz | tar -zxvf - -C /usr/local/
RUN cd /usr/local && ln -s ./flink-1.0.3 flink

# 设置 Dockerfile 的环境变量
ENV FLINK_HOME /usr/local/flink
ENV PATH $PATH:$FLINK_HOME/bin

# 将 Flink 的一些配置放入镜像中
ADD conf/flink-conf.yaml /usr/local/flink/conf/
ADD config-flink.sh /usr/local/flink/bin/

# 设置配置脚本的权限
RUN chmod +x /usr/local/flink/bin/config-flink.sh

# 端口映射
EXPOSE 6123
EXPOSE 22

CMD ["/usr/local/flink/bin/config-flink.sh", "taskmanager"]

beam-flink

# 从依赖的 flink 镜像开始构建镜像
FROM flink

# 下载 beam-starter,可以先理解为一个预先写好的基于 Beam 的 Flink 作业
RUN curl -L https://github.com/ecesena/beam-starter/releases/download/v0.1/beam-starter-0.1.jar > /root/downloads/beam-starter-0.1.jar

# 下载一段文本文件
RUN curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt

# 将本地的文件复制到镜像的目录里
ADD config-flink-load-jar.sh /usr/local/flink/bin/

# Flink 上传 jar 包的脚本
RUN chmod +x /usr/local/flink/bin/config-flink-load-jar.sh

# 运行 taskmanager
CMD ["/usr/local/flink/bin/config-flink.sh", "taskmanager"]

以上 Dockerfile 其实很容易理解,就不赘述了。然后用 docker-compose 来运行 Flink。

docker-compose up -d

运行之后,可以看看 Docker 正在 Running 的容器就有了。

➜  docker-beam-flink git:(master) docker ps
CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                                             NAMES
2de232e58df8        dataradiant/beam-flink   "/usr/local/flink/bi…"   6 hours ago         Up 6 hours          6121-6123/tcp, 0.0.0.0:32768->22/tcp                                              docker-beam-flink_taskmanager_1
98b52be9c56e        dataradiant/beam-flink   "/usr/local/flink/bi…"   6 hours ago         Up 6 hours          6123/tcp, 0.0.0.0:220->22/tcp, 0.0.0.0:48080->8080/tcp, 0.0.0.0:48081->8081/tcp   docker-beam-flink_jobmanager_1

现在呢,我们基于上面的项目已经运行起来一个 Flink 集群,接下来,我们用 beam 的 Flink Runner 来跑起来一个 Flink 程序。

打开 Flink 的 Web UI,然后在 Submit new Job 去提交作业。

按照上图提示,提交的 jar 包是我们打镜像文件的时候打进去的。关于这个项目,我们可以先看看目录结构。

├── LICENSE
├── README.md
├── pom.xml
└── src
    ├── main
    │   └── java
    │       └── com
    │           └── dataradiant
    │               └── beam
    │                   ├── App.java
    │                   └── examples
    │                       ├── StreamWordCount.java
    │                       └── WordCount.java
    └── test
        └── java
            └── com
                └── dataradiant
                    └── beam
                        └── AppTest.java

所以其实很容易理解,这个示例工程,其实就是基于 Beam 来创建的一个 Flink WordCount 程序而已。关于 WordCount 程序,核心代码如下。

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

// 选择 Flink 作为 Runner
options.setRunner(FlinkRunner.class);

// 创建数据处理的 Pipeline
Pipeline p = Pipeline.create(options);

p.apply("ReadLines", TextIO.Read.from(options.getInput()))
        // CountWords() ,其实就是计算词频的一个静态风法
        .apply(new CountWords())
        // 定义输出的格式
        .apply(MapElements.via(new FormatAsTextFn()))
        .apply("WriteCounts", TextIO.Write.to(options.getOutput()));

p.run();

3 Summary

本文就是一个具体的例子,展示了如何用 Beam 来构建 Flink 作业,并且用 Docker 来运行这个程序。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink 集群/任务容器化

    Flink Dockerfile 走读已经介绍了 Flink 的镜像应该如何构建了,接下来,本文解释一下如何利用 Docker 来部署 Flink。

    runzhliu
  • Flink job cluster on Kubernetes

    之前文章介绍了 Flink session cluster on Kubernetes,需要注意,这种部署方式,可以在同一个 Cluster 上多次提交 Fli...

    runzhliu
  • Flink Dockerfile 走读

    既然分析了 Dockerfile,那么也顺带分析一波 docker-entrypoint.sh 脚本都干了什么事。

    runzhliu
  • 妈妈再也不用担心,我学不会大数据 flink 啦

    面对霸气侧漏的业务需求,由于没有大数据知识储备,咱心里没底,咱也不敢问,咱也不敢说,只能静下来默默储备、默默寻觅解决方案。

    一猿小讲
  • 一文入门流处理开发

    Apache Flink 是一个开源的分布式流处理和批处理系统。Flink 的核心是在数据流上提供数据分发、通信、具备容错的分布式计算。同时,Flink 在流处...

    WindyQin
  • Flink1.7从安装到体验

    版权声明:欢迎转载,请注明出处,谢谢。 https://blog.csdn.net/boling_...

    程序员欣宸
  • Flink快速入门--安装与示例运行

    flink是一款开源的大数据流式处理框架,他可以同时批处理和流处理,具有容错性、高吞吐、低延迟等优势,本文简述flink在windows和linux中安装步骤,...

    用户6070864
  • Flink快速入门--安装与示例运行

    首先要想运行Flink,我们需要下载并解压Flink的二进制包,下载地址如下:https://flink.apache.org/downloads.html

    实时计算
  • Flink on yarn初步讲解

    对于flink的基本概念和基本运行模式讲解的内容请参考这篇文章《Flink流式处理概念简介》。本文主要是讲解flink on yarn的运行原理及基本使用,后面...

    Spark学习技巧
  • www6669988com请拨18687679362_环球国际Flink源码走读(一):Flink工程目录

    导语 | Flink已经成为未来流计算趋势,目前在很多大厂已经有了大规模的使用。最近在学习Flink源码,就想把自己学习的过程分享出来,希望能帮助到志同道合的朋...

    用户7106032

扫码关注云+社区

领取腾讯云代金券