根据Flink官网资料,实操CDH5.16.2上配置Flink on yarn,给出了flink on yarn的集成方式和HA的配置方式
1
文档编写目的
2
Flink如何与YARN交互
上图来自Flink官方
Flink YARN Client需要获取Hadoop的配置访问到集群的YARN Resource Manager和HDFS,可以使用如下方式进行配置:
整个交互流程
3
在CDH5上部署Flink
添加HADOOP_CONF_DIR环境变量
vi /etc/profile
# 配置HADOOP_CONF_DIR
export HADOOP_CONF_DIR=/etc/hadoop/conf
# 刷新环境变量
source /etc/profile
配置HADOOP的CLASSPATH
vi /etc/profile
# 配置hadoop的classpath
export HADOOP_CLASSPATH=`hadoop classpath`
# 刷新环境变量
source /etc/profile
添加操作HDFS的用户
编辑/bin目录下的yarn-session.sh文件,配置HADOOP_USER_NAME=hdfs
# 操作hdfs的用户
export HADOOP_USER_NAME=hdfs
per-job提交模式
直接向yarn上提交一个example包下的wordcount任务, per job方式
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
yarn-session提交模式
启动yarn-session
./bin/yarn-session.sh -tm 8192 -s 4
yarn上生成的yarn-session对应的机器是dn2.develop.com:8042
yarn id:application_1589377449274_0645
提交任务到该yarn-session上
./bin/flink run -m yarn-cluster -yid application_1589377449274_0645 ./examples/batch/WordCount.jar
进入yarn-session的web,发现任务执行完成
利用bin下的yarn-session.sh启动yarn-session
./bin/yarn-session.sh
命令行参数:
Usage:
Optional
-at,--applicationType <arg> 设置yarn上应用的自定义程序类型
-D <property=value> 动态参数
-d,--detached 使用分离模式
-h,--help 打出 cli help
-id,--applicationId <arg> 把任务提交到正在运行的yarn session
-j,--jar <arg> flink jar的路径
-jm,--jobManagerMemory <arg> JobMananger内存
-m,--jobmanager <arg> 连接到指定的JobManager
-nl,--nodeLabel <arg> 指定yarn的节点标签
-nm,--name <arg> 指定一个yarn应用的名称
-q,--query 显示可用的yarn资源 cpu + 内存
-qu,--queue <arg> 指定yarn队列
-s,--slots <arg> 每个TaskManager的slot
-t,--ship <arg> 文件发送目录
-tm,--taskManagerMemory <arg> 每个TaskManager容器的内存,默认值mb
-yd,--yarndetached 不建议使用
-z,--zookeeperNamespace <arg> 为高可用性模式创建zk的namespace
生产环境上用得比较多的是flink的per job模式。
在YARN HA情况下,Flink集群不需要多个JobManager实例,当JM出现故障的时候,yarn会尝试重启JM。具体的YARN的操作取决于YARN的版本。
最大的Application Master重试次数
配置集群中yarn-site.xml的最大重试次数,集群的配置一般是2次
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
配置flink应用的重试次数 flink-conf.yaml
yarn.application-attempts: 10
配置高可用的YARN-session
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: localhost:2181
state.checkpoints.dir: hdfs:///flink/flink-checkpoints
进入dn3机器,打印jps
直接kill掉yarnsessionclusterentrypoint, kill -9 117533
看到yarn上的attempt id增加, flink的web ui可以重新进行访问了 flink on yarn ha 测试完成