专栏首页程序猿的大杂烩Flink部署及作业提交(On Flink Standalone)

Flink部署及作业提交(On Flink Standalone)

Flink部署准备及源码编译

官方文档:

前置准备

用于编译源码的机器最好满足如下配置:

  • CPU > 4核
  • 内存 > 8G
  • Note:我这里使用的机器配置是4核8G,如果内存太小编译环节会发生OOM

部署Flink之前首先需要安装好JDK,可以选择8或11版本,我这里选择的是JDK11:

[root@flink01 ~]# java -version
java version "11.0.8" 2020-07-14 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
[root@flink01 ~]# 

由于我们选择的是源码编译的方式安装Flink,所以还需要提前安装好Maven:

[root@flink01 /usr/local/src]# mvn --version
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/maven
Java version: 11.0.8, vendor: Oracle Corporation, runtime: /usr/local/jdk/11
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.el7.x86_64", arch: "amd64", family: "unix"
[root@flink01 /usr/local/src]# 

Flink有个web-dashboard项目的编译需要依赖于NodeJS,所以也需要事先安装好:

[root@flink01 ~]# node -v
v12.18.4
[root@flink01 ~]# 

该项目的构建依赖于angular的cli工具,可以使用如下命令进行安装:

[root@flink01 ~]# npm install -g -registry=https://registry.npm.taobao.org @angular/cli
[root@flink01 ~]# ng --version

     _                      _                 ____ _     ___
    / \   _ __   __ _ _   _| | __ _ _ __     / ___| |   |_ _|
   / △ \ | '_ \ / _` | | | | |/ _` | '__|   | |   | |    | |
  / ___ \| | | | (_| | |_| | | (_| | |      | |___| |___ | |
 /_/   \_\_| |_|\__, |\__,_|_|\__,_|_|       \____|_____|___|
                |___/

Angular CLI: 10.1.3
Node: 12.18.4
OS: linux x64

Angular: 
... 
Ivy Workspace: 

Package                      Version
------------------------------------------------------
@angular-devkit/architect    0.1001.3 (cli-only)
@angular-devkit/core         10.1.3 (cli-only)
@angular-devkit/schematics   10.1.3 (cli-only)
@schematics/angular          10.1.3 (cli-only)
@schematics/update           0.1001.3 (cli-only)

[root@flink01 ~]# 

然后需要在Maven的配置文件中,配置如下两个仓库,cloudera仓库用于下载cdh发行版的Hadoop依赖:

  <mirrors>
    <!-- 配置阿里云的中央镜像仓库 -->
    <mirror>
      <id>nexus-aliyun</id>
      <mirrorOf>central</mirrorOf>
      <name>Nexus aliyun</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>
  </mirrors>

...

  <profiles>
    <!-- 通过profile配置cloudera仓库 -->
    <profile>
      <repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
          <releases>
            <enabled>true</enabled>
          </releases>
          <snapshots>
            <enabled>false</enabled>
          </snapshots>
        </repository>
      </repositories>
    </profile>
  </profiles>

  <!-- 激活profile -->
  <activeProfiles>
    <activeProfile>cloudera-profile</activeProfile>
  </activeProfiles>

源码编译

Flink下载地址:

安装编译源码可能会用到的工具:

[root@flink01 ~]# yum install -y cmake3 git gcc-c++ ncurses-devel perl-Data-Dumper boost boost-doc boost-devel bzip2 openssl-devel libtirpc-devel.x86_64

打开下载页面,复制Flink源码包的下载地址,然后到Linux上通过wget命令进行下载:

[root@flink01 ~]# cd /usr/local/src
[root@flink01 /usr/local/src]# wget https://github.com/apache/flink/archive/release-1.11.2.tar.gz

解压下载好的源码包:

[root@flink01 /usr/local/src]# tar -zxvf flink-release-1.11.2.tar.gz
[root@flink01 /usr/local/src]# cd flink-release-1.11.2

由于flink-runtime-webweb-dashboard模块用到了NodeJS,在编译的过程中需要下载一些依赖的包,但默认的NodeJS仓库在国内几乎无法使用,所以需要更换为淘宝的NodeJS仓库,编辑pom.xml文件:

[root@flink01 /usr/local/src/flink-release-1.11.2]# vim flink-runtime-web/pom.xml

npm install 部分的arguments标签的内容由:

<arguments>ci --cache-max=0 --no-save</arguments>

改为:

<arguments>install -registry=https://registry.npm.taobao.org --cache-max=0 --no-save</arguments>

然后就可以使用Maven编译源码文件了:

[root@flink01 /usr/local/src/flink-release-1.11.2]# mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.15.1 -Dfast

但我这编译flink-runtime-web模块的时候报错了,错误提示如下:

[ERROR] Node.js version v10.9.0 detected.
[ERROR] The Angular CLI requires a minimum Node.js version of either v10.13 or v12.0.
[ERROR] 
[ERROR] Please update your Node.js version or visit https://nodejs.org/ for additional instructions.

错误原因很明显是NodeJS的版本太低了,因为flink-runtime-web/pom.xml文件中定义了使用v10.9.0这个版本的NodeJS,并没有使用我们自己安装好的,于是打开该文件,找到如下标签,修改一下版本号即可,我这里采用v10.13.0:

<nodeVersion>v10.13.0</nodeVersion>

然后重新进行编译:

[root@flink01 /usr/local/src/flink-release-1.11.2]# mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.15.1 -Dfast

再次编译的过程中可能会输出了如下错误信息,但是编译仍然可以继续,并且最终的状态也是成功的。所以可以不用管:

[ERROR] Browserslist: caniuse-lite is outdated. Please run next command `npm update`

编译成功,会输出如下内容:

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for flink 1.11.2:
[INFO] 
[INFO] force-shading ...................................... SUCCESS [  0.721 s]
[INFO] flink .............................................. SUCCESS [  0.581 s]
[INFO] flink-annotations .................................. SUCCESS [  0.627 s]
[INFO] flink-test-utils-parent ............................ SUCCESS [  0.033 s]
[INFO] flink-test-utils-junit ............................. SUCCESS [  0.646 s]
[INFO] flink-metrics ...................................... SUCCESS [  0.032 s]
[INFO] flink-metrics-core ................................. SUCCESS [  0.360 s]
[INFO] flink-core ......................................... SUCCESS [  7.062 s]
[INFO] flink-java ......................................... SUCCESS [  1.520 s]
[INFO] flink-queryable-state .............................. SUCCESS [  0.025 s]
[INFO] flink-queryable-state-client-java .................. SUCCESS [  0.303 s]
[INFO] flink-filesystems .................................. SUCCESS [  0.023 s]
[INFO] flink-hadoop-fs .................................... SUCCESS [  1.031 s]
[INFO] flink-runtime ...................................... SUCCESS [ 24.936 s]
[INFO] flink-scala ........................................ SUCCESS [ 25.682 s]
[INFO] flink-mapr-fs ...................................... SUCCESS [  0.457 s]
[INFO] flink-filesystems :: flink-fs-hadoop-shaded ........ SUCCESS [  2.114 s]
[INFO] flink-s3-fs-base ................................... SUCCESS [  0.424 s]
[INFO] flink-s3-fs-hadoop ................................. SUCCESS [  3.012 s]
[INFO] flink-s3-fs-presto ................................. SUCCESS [  4.794 s]
[INFO] flink-swift-fs-hadoop .............................. SUCCESS [ 12.921 s]
[INFO] flink-oss-fs-hadoop ................................ SUCCESS [  3.700 s]
[INFO] flink-azure-fs-hadoop .............................. SUCCESS [ 15.227 s]
[INFO] flink-optimizer .................................... SUCCESS [  1.171 s]
[INFO] flink-streaming-java ............................... SUCCESS [  4.635 s]
[INFO] flink-clients ...................................... SUCCESS [  0.939 s]
[INFO] flink-test-utils ................................... SUCCESS [  0.634 s]
[INFO] flink-runtime-web .................................. SUCCESS [ 48.675 s]
[INFO] flink-examples ..................................... SUCCESS [  0.043 s]
[INFO] flink-examples-batch ............................... SUCCESS [  9.319 s]
[INFO] flink-connectors ................................... SUCCESS [  0.035 s]
[INFO] flink-hadoop-compatibility ......................... SUCCESS [  5.029 s]
[INFO] flink-state-backends ............................... SUCCESS [  0.018 s]
[INFO] flink-statebackend-rocksdb ......................... SUCCESS [  0.628 s]
[INFO] flink-tests ........................................ SUCCESS [ 22.051 s]
[INFO] flink-streaming-scala .............................. SUCCESS [ 23.293 s]
[INFO] flink-hcatalog ..................................... SUCCESS [  5.332 s]
[INFO] flink-table ........................................ SUCCESS [  0.019 s]
[INFO] flink-table-common ................................. SUCCESS [  1.505 s]
[INFO] flink-table-api-java ............................... SUCCESS [  0.820 s]
[INFO] flink-table-api-java-bridge ........................ SUCCESS [  0.393 s]
[INFO] flink-table-api-scala .............................. SUCCESS [ 10.990 s]
[INFO] flink-table-api-scala-bridge ....................... SUCCESS [  9.643 s]
[INFO] flink-sql-parser ................................... SUCCESS [ 17.153 s]
[INFO] flink-libraries .................................... SUCCESS [  0.018 s]
[INFO] flink-cep .......................................... SUCCESS [  1.447 s]
[INFO] flink-table-planner ................................ SUCCESS [01:12 min]
[INFO] flink-sql-parser-hive .............................. SUCCESS [  1.524 s]
[INFO] flink-table-runtime-blink .......................... SUCCESS [  2.073 s]
[INFO] flink-table-planner-blink .......................... SUCCESS [01:30 min]
[INFO] flink-metrics-jmx .................................. SUCCESS [  0.262 s]
[INFO] flink-formats ...................................... SUCCESS [  0.020 s]
[INFO] flink-json ......................................... SUCCESS [  0.500 s]
[INFO] flink-connector-kafka-base ......................... SUCCESS [  0.983 s]
[INFO] flink-avro ......................................... SUCCESS [  1.600 s]
[INFO] flink-csv .......................................... SUCCESS [  0.520 s]
[INFO] flink-connector-kafka-0.10 ......................... SUCCESS [  0.753 s]
[INFO] flink-connector-kafka-0.11 ......................... SUCCESS [  0.652 s]
[INFO] flink-connector-elasticsearch-base ................. SUCCESS [  0.807 s]
[INFO] flink-connector-elasticsearch5 ..................... SUCCESS [  8.900 s]
[INFO] flink-connector-elasticsearch6 ..................... SUCCESS [  0.691 s]
[INFO] flink-connector-elasticsearch7 ..................... SUCCESS [  0.702 s]
[INFO] flink-connector-hbase .............................. SUCCESS [  1.758 s]
[INFO] flink-hadoop-bulk .................................. SUCCESS [  0.576 s]
[INFO] flink-orc .......................................... SUCCESS [  0.828 s]
[INFO] flink-orc-nohive ................................... SUCCESS [  0.445 s]
[INFO] flink-parquet ...................................... SUCCESS [  0.992 s]
[INFO] flink-connector-hive ............................... SUCCESS [  2.614 s]
[INFO] flink-connector-jdbc ............................... SUCCESS [  0.857 s]
[INFO] flink-connector-rabbitmq ........................... SUCCESS [  0.256 s]
[INFO] flink-connector-twitter ............................ SUCCESS [  1.220 s]
[INFO] flink-connector-nifi ............................... SUCCESS [  0.309 s]
[INFO] flink-connector-cassandra .......................... SUCCESS [  2.280 s]
[INFO] flink-connector-filesystem ......................... SUCCESS [  0.742 s]
[INFO] flink-connector-kafka .............................. SUCCESS [  0.773 s]
[INFO] flink-connector-gcp-pubsub ......................... SUCCESS [ 50.078 s]
[INFO] flink-connector-kinesis ............................ SUCCESS [  5.358 s]
[INFO] flink-sql-connector-elasticsearch7 ................. SUCCESS [  4.625 s]
[INFO] flink-connector-base ............................... SUCCESS [  0.302 s]
[INFO] flink-sql-connector-elasticsearch6 ................. SUCCESS [  3.658 s]
[INFO] flink-sql-connector-kafka-0.10 ..................... SUCCESS [  0.236 s]
[INFO] flink-sql-connector-kafka-0.11 ..................... SUCCESS [  0.299 s]
[INFO] flink-sql-connector-kafka .......................... SUCCESS [  0.603 s]
[INFO] flink-sql-connector-hive-1.2.2 ..................... SUCCESS [  2.527 s]
[INFO] flink-sql-connector-hive-2.2.0 ..................... SUCCESS [  3.090 s]
[INFO] flink-sql-connector-hive-2.3.6 ..................... SUCCESS [  2.966 s]
[INFO] flink-sql-connector-hive-3.1.2 ..................... SUCCESS [  3.828 s]
[INFO] flink-avro-confluent-registry ...................... SUCCESS [ 24.666 s]
[INFO] flink-sequence-file ................................ SUCCESS [  0.397 s]
[INFO] flink-compress ..................................... SUCCESS [  0.393 s]
[INFO] flink-sql-orc ...................................... SUCCESS [  0.196 s]
[INFO] flink-sql-parquet .................................. SUCCESS [  0.352 s]
[INFO] flink-examples-streaming ........................... SUCCESS [ 21.793 s]
[INFO] flink-examples-table ............................... SUCCESS [  6.387 s]
[INFO] flink-examples-build-helper ........................ SUCCESS [  0.041 s]
[INFO] flink-examples-streaming-twitter ................... SUCCESS [  0.332 s]
[INFO] flink-examples-streaming-state-machine ............. SUCCESS [  0.319 s]
[INFO] flink-examples-streaming-gcp-pubsub ................ SUCCESS [  7.588 s]
[INFO] flink-container .................................... SUCCESS [  0.216 s]
[INFO] flink-queryable-state-runtime ...................... SUCCESS [  0.430 s]
[INFO] flink-mesos ........................................ SUCCESS [ 22.759 s]
[INFO] flink-kubernetes ................................... SUCCESS [01:55 min]
[INFO] flink-yarn ......................................... SUCCESS [  1.131 s]
[INFO] flink-gelly ........................................ SUCCESS [  1.344 s]
[INFO] flink-gelly-scala .................................. SUCCESS [ 13.956 s]
[INFO] flink-gelly-examples ............................... SUCCESS [ 11.946 s]
[INFO] flink-external-resources ........................... SUCCESS [  0.017 s]
[INFO] flink-external-resource-gpu ........................ SUCCESS [  0.154 s]
[INFO] flink-metrics-dropwizard ........................... SUCCESS [  5.900 s]
[INFO] flink-metrics-graphite ............................. SUCCESS [  3.591 s]
[INFO] flink-metrics-influxdb ............................. SUCCESS [01:53 min]
[INFO] flink-metrics-prometheus ........................... SUCCESS [ 44.165 s]
[INFO] flink-metrics-statsd ............................... SUCCESS [  0.156 s]
[INFO] flink-metrics-datadog .............................. SUCCESS [  0.158 s]
[INFO] flink-metrics-slf4j ................................ SUCCESS [  0.151 s]
[INFO] flink-cep-scala .................................... SUCCESS [  8.664 s]
[INFO] flink-table-uber ................................... SUCCESS [  3.683 s]
[INFO] flink-table-uber-blink ............................. SUCCESS [  4.093 s]
[INFO] flink-python ....................................... SUCCESS [01:53 min]
[INFO] flink-sql-client ................................... SUCCESS [  8.511 s]
[INFO] flink-state-processor-api .......................... SUCCESS [  0.590 s]
[INFO] flink-ml-parent .................................... SUCCESS [  0.018 s]
[INFO] flink-ml-api ....................................... SUCCESS [  0.159 s]
[INFO] flink-ml-lib ....................................... SUCCESS [  8.357 s]
[INFO] flink-ml-uber ...................................... SUCCESS [  0.076 s]
[INFO] flink-scala-shell .................................. SUCCESS [  9.027 s]
[INFO] flink-dist ......................................... SUCCESS [01:08 min]
[INFO] flink-yarn-tests ................................... SUCCESS [ 11.079 s]
[INFO] flink-end-to-end-tests ............................. SUCCESS [ 37.058 s]
[INFO] flink-cli-test ..................................... SUCCESS [  0.164 s]
[INFO] flink-parent-child-classloading-test-program ....... SUCCESS [  0.141 s]
[INFO] flink-parent-child-classloading-test-lib-package ... SUCCESS [  0.089 s]
[INFO] flink-dataset-allround-test ........................ SUCCESS [  0.140 s]
[INFO] flink-dataset-fine-grained-recovery-test ........... SUCCESS [  0.148 s]
[INFO] flink-datastream-allround-test ..................... SUCCESS [  0.745 s]
[INFO] flink-batch-sql-test ............................... SUCCESS [  0.142 s]
[INFO] flink-stream-sql-test .............................. SUCCESS [  0.148 s]
[INFO] flink-bucketing-sink-test .......................... SUCCESS [  0.315 s]
[INFO] flink-distributed-cache-via-blob ................... SUCCESS [  0.139 s]
[INFO] flink-high-parallelism-iterations-test ............. SUCCESS [  4.416 s]
[INFO] flink-stream-stateful-job-upgrade-test ............. SUCCESS [  0.513 s]
[INFO] flink-queryable-state-test ......................... SUCCESS [  0.981 s]
[INFO] flink-local-recovery-and-allocation-test ........... SUCCESS [  0.133 s]
[INFO] flink-elasticsearch5-test .......................... SUCCESS [  3.092 s]
[INFO] flink-elasticsearch6-test .......................... SUCCESS [  1.650 s]
[INFO] flink-quickstart ................................... SUCCESS [  0.263 s]
[INFO] flink-quickstart-java .............................. SUCCESS [ 16.713 s]
[INFO] flink-quickstart-scala ............................. SUCCESS [  0.057 s]
[INFO] flink-quickstart-test .............................. SUCCESS [  0.315 s]
[INFO] flink-confluent-schema-registry .................... SUCCESS [  1.014 s]
[INFO] flink-stream-state-ttl-test ........................ SUCCESS [  2.333 s]
[INFO] flink-sql-client-test .............................. SUCCESS [01:01 min]
[INFO] flink-streaming-file-sink-test ..................... SUCCESS [  0.130 s]
[INFO] flink-state-evolution-test ......................... SUCCESS [  0.527 s]
[INFO] flink-rocksdb-state-memory-control-test ............ SUCCESS [  0.495 s]
[INFO] flink-end-to-end-tests-common ...................... SUCCESS [  0.527 s]
[INFO] flink-metrics-availability-test .................... SUCCESS [  0.136 s]
[INFO] flink-metrics-reporter-prometheus-test ............. SUCCESS [  0.156 s]
[INFO] flink-heavy-deployment-stress-test ................. SUCCESS [  4.367 s]
[INFO] flink-connector-gcp-pubsub-emulator-tests .......... SUCCESS [02:09 min]
[INFO] flink-streaming-kafka-test-base .................... SUCCESS [  0.193 s]
[INFO] flink-streaming-kafka-test ......................... SUCCESS [  4.041 s]
[INFO] flink-streaming-kafka011-test ...................... SUCCESS [  3.555 s]
[INFO] flink-streaming-kafka010-test ...................... SUCCESS [  3.540 s]
[INFO] flink-plugins-test ................................. SUCCESS [  0.033 s]
[INFO] dummy-fs ........................................... SUCCESS [  0.084 s]
[INFO] another-dummy-fs ................................... SUCCESS [  0.074 s]
[INFO] flink-tpch-test .................................... SUCCESS [  5.635 s]
[INFO] flink-streaming-kinesis-test ....................... SUCCESS [  6.854 s]
[INFO] flink-elasticsearch7-test .......................... SUCCESS [  1.939 s]
[INFO] flink-end-to-end-tests-common-kafka ................ SUCCESS [  0.539 s]
[INFO] flink-tpcds-test ................................... SUCCESS [  0.345 s]
[INFO] flink-netty-shuffle-memory-control-test ............ SUCCESS [  0.144 s]
[INFO] flink-python-test .................................. SUCCESS [  3.675 s]
[INFO] flink-statebackend-heap-spillable .................. SUCCESS [  0.352 s]
[INFO] flink-contrib ...................................... SUCCESS [  0.019 s]
[INFO] flink-connector-wikiedits .......................... SUCCESS [  4.279 s]
[INFO] flink-fs-tests ..................................... SUCCESS [  0.509 s]
[INFO] flink-docs ......................................... SUCCESS [  5.049 s]
[INFO] flink-walkthroughs ................................. SUCCESS [  0.021 s]
[INFO] flink-walkthrough-common ........................... SUCCESS [  0.196 s]
[INFO] flink-walkthrough-datastream-java .................. SUCCESS [  0.053 s]
[INFO] flink-walkthrough-datastream-scala ................. SUCCESS [  0.050 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  24:47 min
[INFO] Finished at: 2020-09-29T01:20:10+08:00
[INFO] ------------------------------------------------------------------------

并且会生成一个目录,目录结构如下:

[root@flink01 /usr/local/src/flink-release-1.11.2]# ls flink-dist/target/flink-1.11.2-bin/flink-1.11.2/
bin  conf  examples  lib  LICENSE  log  opt  plugins  README.txt
[root@flink01 /usr/local/src/flink-release-1.11.2]# 

单机模式部署及代码提交测试

单机模式部署

首先配置一下hosts,将主机名与本地ip建立一个映射关系:

[root@flink01 ~]# vim /etc/hosts
192.168.243.148   flink01

Flink单机模式部署非常简单,只需要将之前编译生成的目录拷贝出来:

[root@flink01 /usr/local/src/flink-release-1.11.2]# cp -r flink-dist/target/flink-1.11.2-bin/flink-1.11.2/ /usr/local/flink

然后使用如下命令就可以启动Flink了:

[root@flink01 /usr/local/flink]# ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host flink01.
Starting taskexecutor daemon on host flink01.
[root@flink01 /usr/local/flink]# jps  # 启动成功会有如下Java进程
2755 Jps
2389 StandaloneSessionClusterEntrypoint
2733 TaskManagerRunner
[root@flink01 /usr/local/flink]# 

与启动命令相对的停止命令如下:

$ ./bin/stop-cluster.sh

日志文件在log目录下,如果启动失败可以通过查看日志文件来排查问题:

[root@flink01 /usr/local/flink]# ls log/
flink-root-standalonesession-0-flink01.log  flink-root-standalonesession-0-flink01.out  flink-root-taskexecutor-0-flink01.log  flink-root-taskexecutor-0-flink01.out
[root@flink01 /usr/local/flink]# 

通过浏览器访问机器ip + 8081端口可以打开Flink的web界面控制台:

在侧边菜单栏中可以看到如下选项:

  • Overview:查看整体概览
  • Running Jobs:查看运行中的作业
  • Completed Jobs:查看已经完成的作业
  • TaskManager:查看TaskManager的系统信息
  • JobManager:查看JobManager的配置及日志信息
  • Submit New Job:可以在该页面中提交作业

Flink的整体架构图如下:

Flink 整个系统主要由两个组件组成,分别为 JobManager 和 TaskManager,Flink 架构也遵循 Master - Slave 架构设计原则,JobManager 为 Master 节点,TaskManager 为 Worker (Slave)节点,TaskManager 可以部署多个。其中,Flink Program是我们使用Flink框架编写的程序,是 TaskManager 具体要执行的任务,任务通过Client提交到集群中。

Client 客户端

Client负责将任务提交到集群,与 JobManager 构建 Akka 连接,然后将任务提交到 JobManager,通过和 JobManager 之间进行交互获取任务执行状态。 Client提交任务可以采用 CLI 方式或者通过使用 Flink WebUI 提交(菜单栏中的 Submit New Job),也可以在应用程序中指定 JobManager 的 RPC 网络端口构建 ExecutionEnvironment 来提交 Flink 应用。

JobManager

JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中 TaskManager 上 TaskSlot 的使用情况,为提交的应用分配相应的 TaskSlot 资源并命令 TaskManager 启动从客户端中获取的应用。 JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。 JobManager 和 TaskManager 之间通过 Actor System 进行通信,获取任务执行的情况并通过 Actor System 将应用的任务执行情况发送给客户端。 同时在任务执行的过程中,Flink JobManager 会触发 Checkpoint 操作,每个 TaskManager 节点 收到 Checkpoint 触发指令后,完成 Checkpoint 操作,所有的 Checkpoint 协调过程都是在 Fink JobManager 中完成。 当任务完成后,Flink 会将任务执行的信息反馈给客户端,并且释放掉 TaskManager 中的资源以供下一次提交任务使用。

TaskManager

TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。 客户端通过将编写好的 Flink 应用编译打包,提交到 JobManager,然后 JobManager 会根据已注册在 JobManager 中 TaskManager 的资源情况,将任务分配给有资源的 TaskManager节点,然后启动并运行任务。 TaskManager 从 JobManager 接收需要部署的任务,然后使用 Slot 资源启动 Task,建立数据接入的网络连接,接收数据并开始数据处理。同时 TaskManager 之间的数据交互都是通过数据流的方式进行的。 可以看出,Flink 的任务运行其实是采用多线程的方式,这和 MapReduce 多 JVM 进行的方式有很大的区别,Flink 能够极大提高 CPU 使用效率,在多个任务和 Task 之间通过 TaskSlot 方式共享系统资源,每个 TaskManager 中通过管理多个 TaskSlot 资源池进行对资源进行有效管理。


代码提交测试

将Flink部署完成并了解了Flink的基本组件概念后,我们可以将Flink自带的一些示例代码提交到集群中测试是否能正常运行。示例代码的目录如下:

[root@flink01 /usr/local/flink]# ls examples/
batch  gelly  python  streaming  table
[root@flink01 /usr/local/flink]# ls examples/streaming/
IncrementalLearning.jar  Iteration.jar  SessionWindowing.jar  SocketWindowWordCount.jar  StateMachineExample.jar  TopSpeedWindowing.jar  Twitter.jar  WindowJoin.jar  WordCount.jar
[root@flink01 /usr/local/flink]# 

我这里采用examples/streaming/SocketWindowWordCount.jar作为测试,该示例代码用于读取Socket流并按照分隔符分隔单词,完成词频统计的功能。为了能够模拟Socket流,我们需要安装一下netcat工具,安装命令如下:

$ yum install -y nc

使用nc命令启动一个Socket监听9999端口,一会我们就可以通过这个Socket写入数据:

$ nc -lk 9999

然后将示例代码提交到Flink中运行:

[root@flink01 /usr/local/flink]# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
Job has been submitted with JobID c90a28408eae654a143745903cbaa3eb

代码提交成功后,此时在界面上就可以看到有一个Job正在运行中:

点进去可以查看详细信息:

nc命令创建的Socket中写入一些数据:

[root@flink01 ~]# nc -lk 9999
a b c a a b b d d c
hello world
flink spark spark flink

在如下文件中可以看到词频统计后的输出结果:

[root@flink01 /usr/local/flink]# cat log/flink-root-taskexecutor-0-flink01.out
a : 3
spark : 2
flink : 2
world : 1
hello : 1
d : 2
c : 2
b : 3
[root@flink01 /usr/local/flink]# 

到此为止我们就测试完了,此时我们要怎么停止这个任务呢?建议不要直接Ctrl + c,可以到web界面上点击“Cancel Job”就可以让Job停止运行:


官方文档:

上一小节演示了Flink的单机模式部署,但在生产环境我们往往都是需要分布式部署的,而Flink也提供了Standalone模式部署,即独立集群。Flink Standalone模式的拓扑图:

为了演示Standalone分布式模式的部署,至少需要两台机器,所以我这里新增一台hostname为 flink02 的机器。目前的机器配置如下:

IP

Hostname

角色

192.168.243.148

flink01

master(JobManager) / worker(TaskManager)

192.168.243.150

flink02

worker(TaskManager)

  • Tips:新增的 flink02 也需要具备Java运行环境

系统配置(所有节点)

配置hosts,将主机名与本地ip建立一个映射关系,使所有节点之间可以通过hostname互相访问:

$ vim /etc/hosts
192.168.243.148   flink01
192.168.243.150   flink02

关闭防火墙:

$ systemctl stop firewalld && systemctl disable firewalld

配置所有节点之间的免密登录:

[root@flink01 ~]# ssh-keygen -t rsa      # 生成密钥对
[root@flink01 ~]# ssh-copy-id flink01    # 拷贝公钥并追加到自己的授权列表文件中
[root@flink01 ~]# ssh-copy-id flink02    # 拷贝公钥并追加到flink02的授权列表文件中
  • flink02 上也重复同样的操作,这里就不重复演示了

然后测试一下能否免密登录,可以看到我这里登录 flink02 节点不需要输入密码:

[root@flink01 ~]# ssh flink02
Last login: Tue Sep 29 14:22:20 2020 from 192.168.243.1
[root@flink02 ~]#

配置 master 节点

flink01 上修改一下配置文件中的几个配置项:

[root@flink01 /usr/local/flink]# vim conf/flink-conf.yaml
jobmanager.rpc.address: flink01
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 2048m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
io.tmp.dirs: /usr/local/flink/tmp_data

创建临时目录:

[root@flink01 /usr/local/flink]# mkdir tmp_data

简单说明下这几个参数:

  • jobmanager.rpc.address:指定master节点的ip地址或hostname
  • jobmanager.memory.process.size:指定JobManager节点可用的内存
  • taskmanager.memory.process.size:指定TaskManager节点可用的内存
  • taskmanager.numberOfTaskSlots:指定每台机器可用的CPU核心数
  • parallelism.default:集群中的CPU总数,也就是任务的并行度
  • io.tmp.dirs:TaskManager的临时数据存储目录
  • 有关配置参数的更多内容可以参考官方文档:Configuration

然后还需要配置 worker 节点的IP或hostname:

[root@flink01 /usr/local/flink]# vim conf/workers
flink01
flink02

重启服务:

[root@flink01 /usr/local/flink]# ./bin/stop-cluster.sh
[root@flink01 /usr/local/flink]# ./bin/start-cluster.sh

配置 worker 节点

flink 目录拷贝到 flink02 上,在 flink02 上执行如下命令:

[root@flink02 ~]# scp -r flink01:/usr/local/flink /usr/local/flink

创建临时目录:

[root@flink02 ~]# cd /usr/local/flink/
[root@flink02 /usr/local/flink]# mkdir tmp_data

启动TaskManager服务:

[root@flink02 /usr/local/flink]# ./bin/taskmanager.sh start
Starting taskexecutor daemon on host flink02.
[root@flink02 /usr/local/flink]# jps
4757 Jps
4701 TaskManagerRunner
[root@flink02 /usr/local/flink]# 

此时在dashboard上就可以看到TaskManager节点数量为2了:

在“Task Manager”页面中也可以看到两个节点的信息:

如果需要新增更多的TaskManager节点,也是按照这种方式添加就可以了,非常简单。接下来我们测试一下提交任务到集群中是否能够正常运行。先使用nc命令创建一个Socket并写入一些数据:

[root@flink01 ~]# nc -lk 9999
a b c a a b b d d c
hello world
flink spark spark flink

然后提交任务:

[root@flink01 /usr/local/flink]# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
Job has been submitted with JobID 641d5e7e0bd572ba4114ea5e69b8644c

在如下文件中可以看到词频统计后的输出结果,代表任务是能够正常运行在Flink的Standalone模式上的:

[root@flink01 /usr/local/flink]# cat log/flink-root-taskexecutor-1-flink01.out
a : 3
spark : 2
flink : 2
world : 1
hello : 1
d : 2
c : 2
b : 3
[root@flink01 /usr/local/flink]# 

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 编译安装大数据平台权限管理组件 - Apache Ranger 3.x

    进入源码目录:cd ranger,修改该目录下的pom文件,主要修改两个地方,第一是将仓库相关配置都给注释掉:

    端碗吹水
  • JavaScript—内置对象

    如果文档包含框架(frame 或 iframe 标签),浏览器会为 HTML 文档创建一个 window 对象,并为每个框架创建一个额外的 window 对象。

    端碗吹水
  • 基本数据类型封装类

    封装类里面的方法和特性都差不多,只要学会其中一个其他的也就会了,一般来讲用得比较多的是Integer,其他则用的比较少。

    端碗吹水
  • 尝尝Blink

    期待 Flink 1.9 整合 Flink 和 Blink 的版本。突然心血来潮,打算自己编一版 Blink 玩玩,这篇文章分为两个部分:

    麒思妙想
  • 手动编译 Flink 1.9 踩坑实录

    大家期盼已久的1.9已经剪支有些日子了,兴冲冲的切换到跑去编译,我在之前的文章《尝尝Blink》里也介绍过如何编译,本文只针对不同的地方以及遇到的坑做一些说明,...

    麒思妙想
  • Flink1.8源码编译安装

    这里我们要介绍的是源码编译的方式,我们需要直接从github上下载最新的relese1.8版本源码或者通过git clone命令来拉取,如下:

    我是攻城师
  • Apache Beam的Docker Demo

    Apache Beam 是统一的批/流数据处理的编程模型。本文主要是参考官方文档,用 Docker 来快速跑起来一个用 Beam 来构建的 Flink 程序来处...

    runzhliu
  • flink实战教程-集群的部署

    这种模式我们一般是在用IDE调试程序的时候用到,当我们在本地用IDE开发程序的时候,执行main方法,flink会在本地启动一个包含jobmanager和tas...

    大数据技术与应用实战
  • 妈妈再也不用担心,我学不会大数据 flink 啦

    面对霸气侧漏的业务需求,由于没有大数据知识储备,咱心里没底,咱也不敢问,咱也不敢说,只能静下来默默储备、默默寻觅解决方案。

    一猿小讲
  • flink实战-实时计算平台通过api停止流任务

    今天我们主要讲一下如何通过api的方式来停止一个通过per job模式部署在yarn集群上的任务。

    大数据技术与应用实战

扫码关注云+社区

领取腾讯云代金券