Flink提供了yarn上运行的3模式,分别为
Application Mode
,Session-Cluster
和Per-Job-Cluster
模式。 Yarn 模式会动态申请资源
请注意,客户端需要YARN_CONF_DIR
或HADOOP_CONF_DIR
环境变量来读取YARN和HDFS配置。没配置的话,就默认是 /etc/hadoop/conf
。
在flink1.13以前,lib下是有hadoop相关的jar包的,但是1.13以后被独立出去了,所以需要加上环境变量,可以加在config.sh的开头。
export HADOOP_CLASSPATH=`hadoop classpath`
export YARN_CLASSPATH=`yarn classpath`
相关的属性配置都可以用-D来设置,比如:
应用程序名设置:-Dyarn.application.name=xxx
内存设置:-Dtaskmanager.memory.process.size=4096m
独享集群,提交之后由yarn现启集群。
老版本(<=1.10)
flink run -m yarn-cluster -c xxx xxx.jar
新版本(>=1.11)
flink run -t yarn-per-job -c xxx xxx.jar
提交一个flink job到yarn(Flink on Yarn Per-Job Mode)
该模式下也可以指定--detached
参数,指定了则一旦作业提交被yarn接受,客户端将停止。
./bin/flink run -t yarn-per-job -p 4 \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
./examples/batch/WordCount.jar
需要帮助则:
./bin/flink run -help
有啥参数可以使用-D来指定,不要使用Options for yarn-cluster mode:下的那些参数比如-ynm、-yjm、-ytm等,不生效。
flink run -t yarn-per-job \ //指定运行模式
-d -ynm FlinkRetention \ //指定在jobmanager里面显示的名字
-Dyarn.application.name=FlinkRetention \ // 指定在yarn上的application的名字
-c com.bigdata.etl.FlinkRetention // 入口类
/data/bigdata/flink-dw/target/flink-dw.jar \ // 自己任务的jar包
--consumer.bootstrap.servers ${consumer_bootstrap_servers} \ 需要传入的参数
--producer.bootstrap.servers ${producer_bootstrap_servers} \
--retentionGroupId ${retentionGroupId} \
--flinkSeconds ${flinkSeconds} \
--redis.ip ${redis_ip} \
--redis.port ${redis_port} \
--redis.password ${redis_password}
先启动一个集群,再将任务提交到上面
Per-Job-Cluster
的方式。
Session-Cluster
是资源共享,job
太多,可能会出现一些问题,但是Session-Cluster
也并非一无是处,若job类型一致就可以放到一个集群中。
bin/yarn-session.sh -d
-d 以后台的方式启动
luster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1628336315656_0001
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1628336315656_0001
Note that killing Flink might not clean up all job artifacts and temporary files.
以 后台的方式启动:luster has been started in detached mode.
为了优雅地停止Flink,请使用以下命令: $ echo "stop" | ./bin/yarn-session.sh -id application_1628336315656_0001
如果这是不可能的,那么你也可以通过YARN的web界面或通过以下方式杀死Flink:
$ yarn application -kill application_1628336315656_0001
注意,杀死Flink可能不会清除所有作业工件和临时文件。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E3DlQffd-1650721637670)(//upload-images.jianshu.io/upload_images/17367901-bd661d45fb94c466.png)]
应用模式,1.11
之后次才有的,类比 Per-Job-Cluster
,也是独享集群。
就在于解析代码的位置,
<!-- 支持 flink yarn高可用 -->
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<!-- 最大重试次数, 默认值为2 -->
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop162:8020/flink/yarn/ha
high-availability.zookeeper.quorum: hadoop162:2181,hadoop163:2181,hadoop164:2181
high-availability.zookeeper.path.root: /flink-yarn
如果 yarn的application,如果开启了高可用,所有的jobid都会变成0000000000000000000;官方的解释,application模式不允许记录多个状态,这样jobid就不好给了。
yarn的session集群,如果开启了高可用,-D应用id参数=appId,指定提交到session集群,可能会有问题。解决方式 :不写 -t yarn-session
# 查看flink提交任务
./bin/flink list
# 查看最近取消的是哪个命令
./bin/flink list -a
参考:https://blog.51cto.com/u_15318160/3248006 https://blog.csdn.net/chanyue123/article/details/110442617 https://www.jianshu.com/p/861bc86465c9 https://www.jianshu.com/p/10f0d4ff50be https://blog.csdn.net/qq_27474277/article/details/116663318