前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink部署及作业提交(On Flink Standalone)

Flink部署及作业提交(On Flink Standalone)

作者头像
端碗吹水
发布2020-09-30 10:16:55
1.9K0
发布2020-09-30 10:16:55
举报

Flink部署准备及源码编译

官方文档:

前置准备

用于编译源码的机器最好满足如下配置:

  • CPU > 4核
  • 内存 > 8G
  • Note:我这里使用的机器配置是4核8G,如果内存太小编译环节会发生OOM

部署Flink之前首先需要安装好JDK,可以选择8或11版本,我这里选择的是JDK11:

代码语言:javascript
复制
[root@flink01 ~]# java -version
java version "11.0.8" 2020-07-14 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
[root@flink01 ~]# 

由于我们选择的是源码编译的方式安装Flink,所以还需要提前安装好Maven:

代码语言:javascript
复制
[root@flink01 /usr/local/src]# mvn --version
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /usr/local/maven
Java version: 11.0.8, vendor: Oracle Corporation, runtime: /usr/local/jdk/11
Default locale: zh_CN, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.el7.x86_64", arch: "amd64", family: "unix"
[root@flink01 /usr/local/src]# 

Flink有个web-dashboard项目的编译需要依赖于NodeJS,所以也需要事先安装好:

代码语言:javascript
复制
[root@flink01 ~]# node -v
v12.18.4
[root@flink01 ~]# 

该项目的构建依赖于angular的cli工具,可以使用如下命令进行安装:

代码语言:javascript
复制
[root@flink01 ~]# npm install -g -registry=https://registry.npm.taobao.org @angular/cli
[root@flink01 ~]# ng --version

     _                      _                 ____ _     ___
    / \   _ __   __ _ _   _| | __ _ _ __     / ___| |   |_ _|
   / △ \ | '_ \ / _` | | | | |/ _` | '__|   | |   | |    | |
  / ___ \| | | | (_| | |_| | | (_| | |      | |___| |___ | |
 /_/   \_\_| |_|\__, |\__,_|_|\__,_|_|       \____|_____|___|
                |___/

Angular CLI: 10.1.3
Node: 12.18.4
OS: linux x64

Angular: 
... 
Ivy Workspace: 

Package                      Version
------------------------------------------------------
@angular-devkit/architect    0.1001.3 (cli-only)
@angular-devkit/core         10.1.3 (cli-only)
@angular-devkit/schematics   10.1.3 (cli-only)
@schematics/angular          10.1.3 (cli-only)
@schematics/update           0.1001.3 (cli-only)

[root@flink01 ~]# 

然后需要在Maven的配置文件中,配置如下两个仓库,cloudera仓库用于下载cdh发行版的Hadoop依赖:

代码语言:javascript
复制
  <mirrors>
    <!-- 配置阿里云的中央镜像仓库 -->
    <mirror>
      <id>nexus-aliyun</id>
      <mirrorOf>central</mirrorOf>
      <name>Nexus aliyun</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>
  </mirrors>

...

  <profiles>
    <!-- 通过profile配置cloudera仓库 -->
    <profile>
      <repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
          <releases>
            <enabled>true</enabled>
          </releases>
          <snapshots>
            <enabled>false</enabled>
          </snapshots>
        </repository>
      </repositories>
    </profile>
  </profiles>

  <!-- 激活profile -->
  <activeProfiles>
    <activeProfile>cloudera-profile</activeProfile>
  </activeProfiles>

源码编译

Flink下载地址:

安装编译源码可能会用到的工具:

代码语言:javascript
复制
[root@flink01 ~]# yum install -y cmake3 git gcc-c++ ncurses-devel perl-Data-Dumper boost boost-doc boost-devel bzip2 openssl-devel libtirpc-devel.x86_64

打开下载页面,复制Flink源码包的下载地址,然后到Linux上通过wget命令进行下载:

代码语言:javascript
复制
[root@flink01 ~]# cd /usr/local/src
[root@flink01 /usr/local/src]# wget https://github.com/apache/flink/archive/release-1.11.2.tar.gz

解压下载好的源码包:

代码语言:javascript
复制
[root@flink01 /usr/local/src]# tar -zxvf flink-release-1.11.2.tar.gz
[root@flink01 /usr/local/src]# cd flink-release-1.11.2

由于flink-runtime-webweb-dashboard模块用到了NodeJS,在编译的过程中需要下载一些依赖的包,但默认的NodeJS仓库在国内几乎无法使用,所以需要更换为淘宝的NodeJS仓库,编辑pom.xml文件:

代码语言:javascript
复制
[root@flink01 /usr/local/src/flink-release-1.11.2]# vim flink-runtime-web/pom.xml

npm install 部分的arguments标签的内容由:

代码语言:javascript
复制
<arguments>ci --cache-max=0 --no-save</arguments>

改为:

代码语言:javascript
复制
<arguments>install -registry=https://registry.npm.taobao.org --cache-max=0 --no-save</arguments>

然后就可以使用Maven编译源码文件了:

代码语言:javascript
复制
[root@flink01 /usr/local/src/flink-release-1.11.2]# mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.15.1 -Dfast

但我这编译flink-runtime-web模块的时候报错了,错误提示如下:

代码语言:javascript
复制
[ERROR] Node.js version v10.9.0 detected.
[ERROR] The Angular CLI requires a minimum Node.js version of either v10.13 or v12.0.
[ERROR] 
[ERROR] Please update your Node.js version or visit https://nodejs.org/ for additional instructions.

错误原因很明显是NodeJS的版本太低了,因为flink-runtime-web/pom.xml文件中定义了使用v10.9.0这个版本的NodeJS,并没有使用我们自己安装好的,于是打开该文件,找到如下标签,修改一下版本号即可,我这里采用v10.13.0:

代码语言:javascript
复制
<nodeVersion>v10.13.0</nodeVersion>

然后重新进行编译:

代码语言:javascript
复制
[root@flink01 /usr/local/src/flink-release-1.11.2]# mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.15.1 -Dfast

再次编译的过程中可能会输出了如下错误信息,但是编译仍然可以继续,并且最终的状态也是成功的。所以可以不用管:

代码语言:javascript
复制
[ERROR] Browserslist: caniuse-lite is outdated. Please run next command `npm update`

编译成功,会输出如下内容:

代码语言:javascript
复制
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for flink 1.11.2:
[INFO] 
[INFO] force-shading ...................................... SUCCESS [  0.721 s]
[INFO] flink .............................................. SUCCESS [  0.581 s]
[INFO] flink-annotations .................................. SUCCESS [  0.627 s]
[INFO] flink-test-utils-parent ............................ SUCCESS [  0.033 s]
[INFO] flink-test-utils-junit ............................. SUCCESS [  0.646 s]
[INFO] flink-metrics ...................................... SUCCESS [  0.032 s]
[INFO] flink-metrics-core ................................. SUCCESS [  0.360 s]
[INFO] flink-core ......................................... SUCCESS [  7.062 s]
[INFO] flink-java ......................................... SUCCESS [  1.520 s]
[INFO] flink-queryable-state .............................. SUCCESS [  0.025 s]
[INFO] flink-queryable-state-client-java .................. SUCCESS [  0.303 s]
[INFO] flink-filesystems .................................. SUCCESS [  0.023 s]
[INFO] flink-hadoop-fs .................................... SUCCESS [  1.031 s]
[INFO] flink-runtime ...................................... SUCCESS [ 24.936 s]
[INFO] flink-scala ........................................ SUCCESS [ 25.682 s]
[INFO] flink-mapr-fs ...................................... SUCCESS [  0.457 s]
[INFO] flink-filesystems :: flink-fs-hadoop-shaded ........ SUCCESS [  2.114 s]
[INFO] flink-s3-fs-base ................................... SUCCESS [  0.424 s]
[INFO] flink-s3-fs-hadoop ................................. SUCCESS [  3.012 s]
[INFO] flink-s3-fs-presto ................................. SUCCESS [  4.794 s]
[INFO] flink-swift-fs-hadoop .............................. SUCCESS [ 12.921 s]
[INFO] flink-oss-fs-hadoop ................................ SUCCESS [  3.700 s]
[INFO] flink-azure-fs-hadoop .............................. SUCCESS [ 15.227 s]
[INFO] flink-optimizer .................................... SUCCESS [  1.171 s]
[INFO] flink-streaming-java ............................... SUCCESS [  4.635 s]
[INFO] flink-clients ...................................... SUCCESS [  0.939 s]
[INFO] flink-test-utils ................................... SUCCESS [  0.634 s]
[INFO] flink-runtime-web .................................. SUCCESS [ 48.675 s]
[INFO] flink-examples ..................................... SUCCESS [  0.043 s]
[INFO] flink-examples-batch ............................... SUCCESS [  9.319 s]
[INFO] flink-connectors ................................... SUCCESS [  0.035 s]
[INFO] flink-hadoop-compatibility ......................... SUCCESS [  5.029 s]
[INFO] flink-state-backends ............................... SUCCESS [  0.018 s]
[INFO] flink-statebackend-rocksdb ......................... SUCCESS [  0.628 s]
[INFO] flink-tests ........................................ SUCCESS [ 22.051 s]
[INFO] flink-streaming-scala .............................. SUCCESS [ 23.293 s]
[INFO] flink-hcatalog ..................................... SUCCESS [  5.332 s]
[INFO] flink-table ........................................ SUCCESS [  0.019 s]
[INFO] flink-table-common ................................. SUCCESS [  1.505 s]
[INFO] flink-table-api-java ............................... SUCCESS [  0.820 s]
[INFO] flink-table-api-java-bridge ........................ SUCCESS [  0.393 s]
[INFO] flink-table-api-scala .............................. SUCCESS [ 10.990 s]
[INFO] flink-table-api-scala-bridge ....................... SUCCESS [  9.643 s]
[INFO] flink-sql-parser ................................... SUCCESS [ 17.153 s]
[INFO] flink-libraries .................................... SUCCESS [  0.018 s]
[INFO] flink-cep .......................................... SUCCESS [  1.447 s]
[INFO] flink-table-planner ................................ SUCCESS [01:12 min]
[INFO] flink-sql-parser-hive .............................. SUCCESS [  1.524 s]
[INFO] flink-table-runtime-blink .......................... SUCCESS [  2.073 s]
[INFO] flink-table-planner-blink .......................... SUCCESS [01:30 min]
[INFO] flink-metrics-jmx .................................. SUCCESS [  0.262 s]
[INFO] flink-formats ...................................... SUCCESS [  0.020 s]
[INFO] flink-json ......................................... SUCCESS [  0.500 s]
[INFO] flink-connector-kafka-base ......................... SUCCESS [  0.983 s]
[INFO] flink-avro ......................................... SUCCESS [  1.600 s]
[INFO] flink-csv .......................................... SUCCESS [  0.520 s]
[INFO] flink-connector-kafka-0.10 ......................... SUCCESS [  0.753 s]
[INFO] flink-connector-kafka-0.11 ......................... SUCCESS [  0.652 s]
[INFO] flink-connector-elasticsearch-base ................. SUCCESS [  0.807 s]
[INFO] flink-connector-elasticsearch5 ..................... SUCCESS [  8.900 s]
[INFO] flink-connector-elasticsearch6 ..................... SUCCESS [  0.691 s]
[INFO] flink-connector-elasticsearch7 ..................... SUCCESS [  0.702 s]
[INFO] flink-connector-hbase .............................. SUCCESS [  1.758 s]
[INFO] flink-hadoop-bulk .................................. SUCCESS [  0.576 s]
[INFO] flink-orc .......................................... SUCCESS [  0.828 s]
[INFO] flink-orc-nohive ................................... SUCCESS [  0.445 s]
[INFO] flink-parquet ...................................... SUCCESS [  0.992 s]
[INFO] flink-connector-hive ............................... SUCCESS [  2.614 s]
[INFO] flink-connector-jdbc ............................... SUCCESS [  0.857 s]
[INFO] flink-connector-rabbitmq ........................... SUCCESS [  0.256 s]
[INFO] flink-connector-twitter ............................ SUCCESS [  1.220 s]
[INFO] flink-connector-nifi ............................... SUCCESS [  0.309 s]
[INFO] flink-connector-cassandra .......................... SUCCESS [  2.280 s]
[INFO] flink-connector-filesystem ......................... SUCCESS [  0.742 s]
[INFO] flink-connector-kafka .............................. SUCCESS [  0.773 s]
[INFO] flink-connector-gcp-pubsub ......................... SUCCESS [ 50.078 s]
[INFO] flink-connector-kinesis ............................ SUCCESS [  5.358 s]
[INFO] flink-sql-connector-elasticsearch7 ................. SUCCESS [  4.625 s]
[INFO] flink-connector-base ............................... SUCCESS [  0.302 s]
[INFO] flink-sql-connector-elasticsearch6 ................. SUCCESS [  3.658 s]
[INFO] flink-sql-connector-kafka-0.10 ..................... SUCCESS [  0.236 s]
[INFO] flink-sql-connector-kafka-0.11 ..................... SUCCESS [  0.299 s]
[INFO] flink-sql-connector-kafka .......................... SUCCESS [  0.603 s]
[INFO] flink-sql-connector-hive-1.2.2 ..................... SUCCESS [  2.527 s]
[INFO] flink-sql-connector-hive-2.2.0 ..................... SUCCESS [  3.090 s]
[INFO] flink-sql-connector-hive-2.3.6 ..................... SUCCESS [  2.966 s]
[INFO] flink-sql-connector-hive-3.1.2 ..................... SUCCESS [  3.828 s]
[INFO] flink-avro-confluent-registry ...................... SUCCESS [ 24.666 s]
[INFO] flink-sequence-file ................................ SUCCESS [  0.397 s]
[INFO] flink-compress ..................................... SUCCESS [  0.393 s]
[INFO] flink-sql-orc ...................................... SUCCESS [  0.196 s]
[INFO] flink-sql-parquet .................................. SUCCESS [  0.352 s]
[INFO] flink-examples-streaming ........................... SUCCESS [ 21.793 s]
[INFO] flink-examples-table ............................... SUCCESS [  6.387 s]
[INFO] flink-examples-build-helper ........................ SUCCESS [  0.041 s]
[INFO] flink-examples-streaming-twitter ................... SUCCESS [  0.332 s]
[INFO] flink-examples-streaming-state-machine ............. SUCCESS [  0.319 s]
[INFO] flink-examples-streaming-gcp-pubsub ................ SUCCESS [  7.588 s]
[INFO] flink-container .................................... SUCCESS [  0.216 s]
[INFO] flink-queryable-state-runtime ...................... SUCCESS [  0.430 s]
[INFO] flink-mesos ........................................ SUCCESS [ 22.759 s]
[INFO] flink-kubernetes ................................... SUCCESS [01:55 min]
[INFO] flink-yarn ......................................... SUCCESS [  1.131 s]
[INFO] flink-gelly ........................................ SUCCESS [  1.344 s]
[INFO] flink-gelly-scala .................................. SUCCESS [ 13.956 s]
[INFO] flink-gelly-examples ............................... SUCCESS [ 11.946 s]
[INFO] flink-external-resources ........................... SUCCESS [  0.017 s]
[INFO] flink-external-resource-gpu ........................ SUCCESS [  0.154 s]
[INFO] flink-metrics-dropwizard ........................... SUCCESS [  5.900 s]
[INFO] flink-metrics-graphite ............................. SUCCESS [  3.591 s]
[INFO] flink-metrics-influxdb ............................. SUCCESS [01:53 min]
[INFO] flink-metrics-prometheus ........................... SUCCESS [ 44.165 s]
[INFO] flink-metrics-statsd ............................... SUCCESS [  0.156 s]
[INFO] flink-metrics-datadog .............................. SUCCESS [  0.158 s]
[INFO] flink-metrics-slf4j ................................ SUCCESS [  0.151 s]
[INFO] flink-cep-scala .................................... SUCCESS [  8.664 s]
[INFO] flink-table-uber ................................... SUCCESS [  3.683 s]
[INFO] flink-table-uber-blink ............................. SUCCESS [  4.093 s]
[INFO] flink-python ....................................... SUCCESS [01:53 min]
[INFO] flink-sql-client ................................... SUCCESS [  8.511 s]
[INFO] flink-state-processor-api .......................... SUCCESS [  0.590 s]
[INFO] flink-ml-parent .................................... SUCCESS [  0.018 s]
[INFO] flink-ml-api ....................................... SUCCESS [  0.159 s]
[INFO] flink-ml-lib ....................................... SUCCESS [  8.357 s]
[INFO] flink-ml-uber ...................................... SUCCESS [  0.076 s]
[INFO] flink-scala-shell .................................. SUCCESS [  9.027 s]
[INFO] flink-dist ......................................... SUCCESS [01:08 min]
[INFO] flink-yarn-tests ................................... SUCCESS [ 11.079 s]
[INFO] flink-end-to-end-tests ............................. SUCCESS [ 37.058 s]
[INFO] flink-cli-test ..................................... SUCCESS [  0.164 s]
[INFO] flink-parent-child-classloading-test-program ....... SUCCESS [  0.141 s]
[INFO] flink-parent-child-classloading-test-lib-package ... SUCCESS [  0.089 s]
[INFO] flink-dataset-allround-test ........................ SUCCESS [  0.140 s]
[INFO] flink-dataset-fine-grained-recovery-test ........... SUCCESS [  0.148 s]
[INFO] flink-datastream-allround-test ..................... SUCCESS [  0.745 s]
[INFO] flink-batch-sql-test ............................... SUCCESS [  0.142 s]
[INFO] flink-stream-sql-test .............................. SUCCESS [  0.148 s]
[INFO] flink-bucketing-sink-test .......................... SUCCESS [  0.315 s]
[INFO] flink-distributed-cache-via-blob ................... SUCCESS [  0.139 s]
[INFO] flink-high-parallelism-iterations-test ............. SUCCESS [  4.416 s]
[INFO] flink-stream-stateful-job-upgrade-test ............. SUCCESS [  0.513 s]
[INFO] flink-queryable-state-test ......................... SUCCESS [  0.981 s]
[INFO] flink-local-recovery-and-allocation-test ........... SUCCESS [  0.133 s]
[INFO] flink-elasticsearch5-test .......................... SUCCESS [  3.092 s]
[INFO] flink-elasticsearch6-test .......................... SUCCESS [  1.650 s]
[INFO] flink-quickstart ................................... SUCCESS [  0.263 s]
[INFO] flink-quickstart-java .............................. SUCCESS [ 16.713 s]
[INFO] flink-quickstart-scala ............................. SUCCESS [  0.057 s]
[INFO] flink-quickstart-test .............................. SUCCESS [  0.315 s]
[INFO] flink-confluent-schema-registry .................... SUCCESS [  1.014 s]
[INFO] flink-stream-state-ttl-test ........................ SUCCESS [  2.333 s]
[INFO] flink-sql-client-test .............................. SUCCESS [01:01 min]
[INFO] flink-streaming-file-sink-test ..................... SUCCESS [  0.130 s]
[INFO] flink-state-evolution-test ......................... SUCCESS [  0.527 s]
[INFO] flink-rocksdb-state-memory-control-test ............ SUCCESS [  0.495 s]
[INFO] flink-end-to-end-tests-common ...................... SUCCESS [  0.527 s]
[INFO] flink-metrics-availability-test .................... SUCCESS [  0.136 s]
[INFO] flink-metrics-reporter-prometheus-test ............. SUCCESS [  0.156 s]
[INFO] flink-heavy-deployment-stress-test ................. SUCCESS [  4.367 s]
[INFO] flink-connector-gcp-pubsub-emulator-tests .......... SUCCESS [02:09 min]
[INFO] flink-streaming-kafka-test-base .................... SUCCESS [  0.193 s]
[INFO] flink-streaming-kafka-test ......................... SUCCESS [  4.041 s]
[INFO] flink-streaming-kafka011-test ...................... SUCCESS [  3.555 s]
[INFO] flink-streaming-kafka010-test ...................... SUCCESS [  3.540 s]
[INFO] flink-plugins-test ................................. SUCCESS [  0.033 s]
[INFO] dummy-fs ........................................... SUCCESS [  0.084 s]
[INFO] another-dummy-fs ................................... SUCCESS [  0.074 s]
[INFO] flink-tpch-test .................................... SUCCESS [  5.635 s]
[INFO] flink-streaming-kinesis-test ....................... SUCCESS [  6.854 s]
[INFO] flink-elasticsearch7-test .......................... SUCCESS [  1.939 s]
[INFO] flink-end-to-end-tests-common-kafka ................ SUCCESS [  0.539 s]
[INFO] flink-tpcds-test ................................... SUCCESS [  0.345 s]
[INFO] flink-netty-shuffle-memory-control-test ............ SUCCESS [  0.144 s]
[INFO] flink-python-test .................................. SUCCESS [  3.675 s]
[INFO] flink-statebackend-heap-spillable .................. SUCCESS [  0.352 s]
[INFO] flink-contrib ...................................... SUCCESS [  0.019 s]
[INFO] flink-connector-wikiedits .......................... SUCCESS [  4.279 s]
[INFO] flink-fs-tests ..................................... SUCCESS [  0.509 s]
[INFO] flink-docs ......................................... SUCCESS [  5.049 s]
[INFO] flink-walkthroughs ................................. SUCCESS [  0.021 s]
[INFO] flink-walkthrough-common ........................... SUCCESS [  0.196 s]
[INFO] flink-walkthrough-datastream-java .................. SUCCESS [  0.053 s]
[INFO] flink-walkthrough-datastream-scala ................. SUCCESS [  0.050 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  24:47 min
[INFO] Finished at: 2020-09-29T01:20:10+08:00
[INFO] ------------------------------------------------------------------------

并且会生成一个目录,目录结构如下:

代码语言:javascript
复制
[root@flink01 /usr/local/src/flink-release-1.11.2]# ls flink-dist/target/flink-1.11.2-bin/flink-1.11.2/
bin  conf  examples  lib  LICENSE  log  opt  plugins  README.txt
[root@flink01 /usr/local/src/flink-release-1.11.2]# 

单机模式部署及代码提交测试

单机模式部署

首先配置一下hosts,将主机名与本地ip建立一个映射关系:

代码语言:javascript
复制
[root@flink01 ~]# vim /etc/hosts
192.168.243.148   flink01

Flink单机模式部署非常简单,只需要将之前编译生成的目录拷贝出来:

代码语言:javascript
复制
[root@flink01 /usr/local/src/flink-release-1.11.2]# cp -r flink-dist/target/flink-1.11.2-bin/flink-1.11.2/ /usr/local/flink

然后使用如下命令就可以启动Flink了:

代码语言:javascript
复制
[root@flink01 /usr/local/flink]# ./bin/start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host flink01.
Starting taskexecutor daemon on host flink01.
[root@flink01 /usr/local/flink]# jps  # 启动成功会有如下Java进程
2755 Jps
2389 StandaloneSessionClusterEntrypoint
2733 TaskManagerRunner
[root@flink01 /usr/local/flink]# 

与启动命令相对的停止命令如下:

代码语言:javascript
复制
$ ./bin/stop-cluster.sh

日志文件在log目录下,如果启动失败可以通过查看日志文件来排查问题:

代码语言:javascript
复制
[root@flink01 /usr/local/flink]# ls log/
flink-root-standalonesession-0-flink01.log  flink-root-standalonesession-0-flink01.out  flink-root-taskexecutor-0-flink01.log  flink-root-taskexecutor-0-flink01.out
[root@flink01 /usr/local/flink]# 

通过浏览器访问机器ip + 8081端口可以打开Flink的web界面控制台:

Flink部署及作业提交(On Flink Standalone)
Flink部署及作业提交(On Flink Standalone)

在侧边菜单栏中可以看到如下选项:

Flink部署及作业提交(On Flink Standalone)
Flink部署及作业提交(On Flink Standalone)
  • Overview:查看整体概览
  • Running Jobs:查看运行中的作业
  • Completed Jobs:查看已经完成的作业
  • TaskManager:查看TaskManager的系统信息
  • JobManager:查看JobManager的配置及日志信息
  • Submit New Job:可以在该页面中提交作业

Flink的整体架构图如下:

Flink部署及作业提交(On Flink Standalone)
Flink部署及作业提交(On Flink Standalone)

Flink 整个系统主要由两个组件组成,分别为 JobManager 和 TaskManager,Flink 架构也遵循 Master - Slave 架构设计原则,JobManager 为 Master 节点,TaskManager 为 Worker (Slave)节点,TaskManager 可以部署多个。其中,Flink Program是我们使用Flink框架编写的程序,是 TaskManager 具体要执行的任务,任务通过Client提交到集群中。

Client 客户端

Client负责将任务提交到集群,与 JobManager 构建 Akka 连接,然后将任务提交到 JobManager,通过和 JobManager 之间进行交互获取任务执行状态。 Client提交任务可以采用 CLI 方式或者通过使用 Flink WebUI 提交(菜单栏中的 Submit New Job),也可以在应用程序中指定 JobManager 的 RPC 网络端口构建 ExecutionEnvironment 来提交 Flink 应用。

JobManager

JobManager 负责整个 Flink 集群任务的调度以及资源的管理,从客户端中获取提交的应用,然后根据集群中 TaskManager 上 TaskSlot 的使用情况,为提交的应用分配相应的 TaskSlot 资源并命令 TaskManager 启动从客户端中获取的应用。 JobManager 相当于整个集群的 Master 节点,且整个集群有且只有一个活跃的 JobManager ,负责整个集群的任务管理和资源管理。 JobManager 和 TaskManager 之间通过 Actor System 进行通信,获取任务执行的情况并通过 Actor System 将应用的任务执行情况发送给客户端。 同时在任务执行的过程中,Flink JobManager 会触发 Checkpoint 操作,每个 TaskManager 节点 收到 Checkpoint 触发指令后,完成 Checkpoint 操作,所有的 Checkpoint 协调过程都是在 Fink JobManager 中完成。 当任务完成后,Flink 会将任务执行的信息反馈给客户端,并且释放掉 TaskManager 中的资源以供下一次提交任务使用。

TaskManager

TaskManager 相当于整个集群的 Slave 节点,负责具体的任务执行和对应任务在每个节点上的资源申请和管理。 客户端通过将编写好的 Flink 应用编译打包,提交到 JobManager,然后 JobManager 会根据已注册在 JobManager 中 TaskManager 的资源情况,将任务分配给有资源的 TaskManager节点,然后启动并运行任务。 TaskManager 从 JobManager 接收需要部署的任务,然后使用 Slot 资源启动 Task,建立数据接入的网络连接,接收数据并开始数据处理。同时 TaskManager 之间的数据交互都是通过数据流的方式进行的。 可以看出,Flink 的任务运行其实是采用多线程的方式,这和 MapReduce 多 JVM 进行的方式有很大的区别,Flink 能够极大提高 CPU 使用效率,在多个任务和 Task 之间通过 TaskSlot 方式共享系统资源,每个 TaskManager 中通过管理多个 TaskSlot 资源池进行对资源进行有效管理。


代码提交测试

将Flink部署完成并了解了Flink的基本组件概念后,我们可以将Flink自带的一些示例代码提交到集群中测试是否能正常运行。示例代码的目录如下:

代码语言:javascript
复制
[root@flink01 /usr/local/flink]# ls examples/
batch  gelly  python  streaming  table
[root@flink01 /usr/local/flink]# ls examples/streaming/
IncrementalLearning.jar  Iteration.jar  SessionWindowing.jar  SocketWindowWordCount.jar  StateMachineExample.jar  TopSpeedWindowing.jar  Twitter.jar  WindowJoin.jar  WordCount.jar
[root@flink01 /usr/local/flink]# 

我这里采用examples/streaming/SocketWindowWordCount.jar作为测试,该示例代码用于读取Socket流并按照分隔符分隔单词,完成词频统计的功能。为了能够模拟Socket流,我们需要安装一下netcat工具,安装命令如下:

代码语言:javascript
复制
$ yum install -y nc

使用nc命令启动一个Socket监听9999端口,一会我们就可以通过这个Socket写入数据:

代码语言:javascript
复制
$ nc -lk 9999

然后将示例代码提交到Flink中运行:

代码语言:javascript
复制
[root@flink01 /usr/local/flink]# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
Job has been submitted with JobID c90a28408eae654a143745903cbaa3eb

代码提交成功后,此时在界面上就可以看到有一个Job正在运行中:

Flink部署及作业提交(On Flink Standalone)
Flink部署及作业提交(On Flink Standalone)

点进去可以查看详细信息:

Flink部署及作业提交(On Flink Standalone)
Flink部署及作业提交(On Flink Standalone)

nc命令创建的Socket中写入一些数据:

代码语言:javascript
复制
[root@flink01 ~]# nc -lk 9999
a b c a a b b d d c
hello world
flink spark spark flink

在如下文件中可以看到词频统计后的输出结果:

代码语言:javascript
复制
[root@flink01 /usr/local/flink]# cat log/flink-root-taskexecutor-0-flink01.out
a : 3
spark : 2
flink : 2
world : 1
hello : 1
d : 2
c : 2
b : 3
[root@flink01 /usr/local/flink]# 

到此为止我们就测试完了,此时我们要怎么停止这个任务呢?建议不要直接Ctrl + c,可以到web界面上点击“Cancel Job”就可以让Job停止运行:

Flink部署及作业提交(On Flink Standalone)
Flink部署及作业提交(On Flink Standalone)

Flink Standalone模式部署

官方文档:

上一小节演示了Flink的单机模式部署,但在生产环境我们往往都是需要分布式部署的,而Flink也提供了Standalone模式部署,即独立集群。Flink Standalone模式的拓扑图:

Flink部署及作业提交(On Flink Standalone)
Flink部署及作业提交(On Flink Standalone)

为了演示Standalone分布式模式的部署,至少需要两台机器,所以我这里新增一台hostname为 flink02 的机器。目前的机器配置如下:

IP

Hostname

角色

192.168.243.148

flink01

master(JobManager) / worker(TaskManager)

192.168.243.150

flink02

worker(TaskManager)

  • Tips:新增的 flink02 也需要具备Java运行环境

系统配置(所有节点)

配置hosts,将主机名与本地ip建立一个映射关系,使所有节点之间可以通过hostname互相访问:

代码语言:javascript
复制
$ vim /etc/hosts
192.168.243.148   flink01
192.168.243.150   flink02

关闭防火墙:

代码语言:javascript
复制
$ systemctl stop firewalld && systemctl disable firewalld

配置所有节点之间的免密登录:

代码语言:javascript
复制
[root@flink01 ~]# ssh-keygen -t rsa      # 生成密钥对
[root@flink01 ~]# ssh-copy-id flink01    # 拷贝公钥并追加到自己的授权列表文件中
[root@flink01 ~]# ssh-copy-id flink02    # 拷贝公钥并追加到flink02的授权列表文件中
  • flink02 上也重复同样的操作,这里就不重复演示了

然后测试一下能否免密登录,可以看到我这里登录 flink02 节点不需要输入密码:

代码语言:javascript
复制
[root@flink01 ~]# ssh flink02
Last login: Tue Sep 29 14:22:20 2020 from 192.168.243.1
[root@flink02 ~]#

配置 master 节点

flink01 上修改一下配置文件中的几个配置项:

代码语言:javascript
复制
[root@flink01 /usr/local/flink]# vim conf/flink-conf.yaml
jobmanager.rpc.address: flink01
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 2048m
taskmanager.numberOfTaskSlots: 2
parallelism.default: 1
io.tmp.dirs: /usr/local/flink/tmp_data

创建临时目录:

代码语言:javascript
复制
[root@flink01 /usr/local/flink]# mkdir tmp_data

简单说明下这几个参数:

  • jobmanager.rpc.address:指定master节点的ip地址或hostname
  • jobmanager.memory.process.size:指定JobManager节点可用的内存
  • taskmanager.memory.process.size:指定TaskManager节点可用的内存
  • taskmanager.numberOfTaskSlots:指定每台机器可用的CPU核心数
  • parallelism.default:集群中的CPU总数,也就是任务的并行度
  • io.tmp.dirs:TaskManager的临时数据存储目录
  • 有关配置参数的更多内容可以参考官方文档:Configuration

然后还需要配置 worker 节点的IP或hostname:

代码语言:javascript
复制
[root@flink01 /usr/local/flink]# vim conf/workers
flink01
flink02

重启服务:

代码语言:javascript
复制
[root@flink01 /usr/local/flink]# ./bin/stop-cluster.sh
[root@flink01 /usr/local/flink]# ./bin/start-cluster.sh

配置 worker 节点

flink 目录拷贝到 flink02 上,在 flink02 上执行如下命令:

代码语言:javascript
复制
[root@flink02 ~]# scp -r flink01:/usr/local/flink /usr/local/flink

创建临时目录:

代码语言:javascript
复制
[root@flink02 ~]# cd /usr/local/flink/
[root@flink02 /usr/local/flink]# mkdir tmp_data

启动TaskManager服务:

代码语言:javascript
复制
[root@flink02 /usr/local/flink]# ./bin/taskmanager.sh start
Starting taskexecutor daemon on host flink02.
[root@flink02 /usr/local/flink]# jps
4757 Jps
4701 TaskManagerRunner
[root@flink02 /usr/local/flink]# 

此时在dashboard上就可以看到TaskManager节点数量为2了:

Flink部署及作业提交(On Flink Standalone)
Flink部署及作业提交(On Flink Standalone)

在“Task Manager”页面中也可以看到两个节点的信息:

Flink部署及作业提交(On Flink Standalone)
Flink部署及作业提交(On Flink Standalone)

如果需要新增更多的TaskManager节点,也是按照这种方式添加就可以了,非常简单。接下来我们测试一下提交任务到集群中是否能够正常运行。先使用nc命令创建一个Socket并写入一些数据:

代码语言:javascript
复制
[root@flink01 ~]# nc -lk 9999
a b c a a b b d d c
hello world
flink spark spark flink

然后提交任务:

代码语言:javascript
复制
[root@flink01 /usr/local/flink]# ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9999
Job has been submitted with JobID 641d5e7e0bd572ba4114ea5e69b8644c

在如下文件中可以看到词频统计后的输出结果,代表任务是能够正常运行在Flink的Standalone模式上的:

代码语言:javascript
复制
[root@flink01 /usr/local/flink]# cat log/flink-root-taskexecutor-1-flink01.out
a : 3
spark : 2
flink : 2
world : 1
hello : 1
d : 2
c : 2
b : 3
[root@flink01 /usr/local/flink]# 
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-09-29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink部署准备及源码编译
    • 前置准备
      • 源码编译
      • 单机模式部署及代码提交测试
        • 单机模式部署
          • Client 客户端
            • JobManager
              • TaskManager
                • 代码提交测试
                • Flink Standalone模式部署
                  • 系统配置(所有节点)
                    • 配置 master 节点
                      • 配置 worker 节点
                      相关产品与服务
                      大数据
                      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档