首页
学习
活动
专区
圈层
工具
发布
29 篇文章
1
Flink 1.14.0 内存优化你不懂?跟着土哥走就对了(万字长文+参数调优)
2
Flink SQL 优化实战 - 维表 JOIN 优化
3
六大方法彻底解决Flink Table & SQL维表Join
4
flink sql 知其所以然(十二):流 join 很难嘛???(上)
5
Flink重点难点:状态(Checkpoint和Savepoint)容错与两阶段提交
6
Flink重点难点:内存模型与内存结构
7
flink sql 知其所以然(十三):流 join 很难嘛???(下)
8
flink sql 知其所以然(十五):改了改源码,实现了个 batch lookup join
9
Flink SQL 知其所以然(二十六):2w 字详述 Join 操作(大威天龙)
10
【Flink】第二篇:维表Join之版本表
11
Flink SQL Kafka Connector
12
Apache-Flink深度解析-Temporal-Table-JOIN
13
【Flink】第二十三篇:join 之 temporal join
14
Flink企业级优化全面总结(3万字长文,15张图)
15
Flink源码走读(二):Flink+Kafka实现端到端Exactly Once语义
16
Flink SQL 双表 JOIN 介绍与原理简析
17
【Flink】第十篇:join 之 regular join
18
数据同步工具之FlinkCDC/Canal/Debezium对比
19
Flink-Kafka性能压测全记录
20
聊聊flink的CheckpointedFunction
21
Google Aviator——轻量级 Java 表达式引擎实战
22
【Flink】第八篇:Flink 内存管理
23
Flink内存配置指南
24
从理论到工程实践——用户画像入门宝典
25
【万字长文】详解Flink作业提交流程
26
[1131]Flink(1.13)命令行提交Job
27
[1101]flink常用参数说明
28
flink on yarn 模式下提示yarn资源不足问题分析
29
Flink流量控制与反压机制完全总结
清单首页Flink文章详情

[1131]Flink(1.13)命令行提交Job

文章目录

Flink提供了yarn上运行的3模式,分别为Application Mode, Session-ClusterPer-Job-Cluster模式。 Yarn 模式会动态申请资源

请注意,客户端需要YARN_CONF_DIRHADOOP_CONF_DIR环境变量来读取YARN和HDFS配置。没配置的话,就默认是 /etc/hadoop/conf

在flink1.13以前,lib下是有hadoop相关的jar包的,但是1.13以后被独立出去了,所以需要加上环境变量,可以加在config.sh的开头。

代码语言:javascript
复制
export HADOOP_CLASSPATH=`hadoop classpath`
export YARN_CLASSPATH=`yarn classpath`

相关的属性配置都可以用-D来设置,比如: 应用程序名设置:-Dyarn.application.name=xxx 内存设置:-Dtaskmanager.memory.process.size=4096m

Per-Job-Cluster新老版本启动方法

独享集群,提交之后由yarn现启集群。

老版本(<=1.10)

代码语言:javascript
复制
flink run -m yarn-cluster -c xxx xxx.jar

新版本(>=1.11)

代码语言:javascript
复制
flink run -t yarn-per-job -c xxx xxx.jar

提交一个flink job到yarn(Flink on Yarn Per-Job Mode) 该模式下也可以指定--detached参数,指定了则一旦作业提交被yarn接受,客户端将停止。

代码语言:javascript
复制
./bin/flink run -t yarn-per-job -p 4 \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \ 
./examples/batch/WordCount.jar

需要帮助则:

代码语言:javascript
复制
./bin/flink run -help

有啥参数可以使用-D来指定,不要使用Options for yarn-cluster mode:下的那些参数比如-ynm、-yjm、-ytm等,不生效。

  • 提交样例
代码语言:javascript
复制
flink  run -t yarn-per-job \   //指定运行模式
-d -ynm FlinkRetention \   //指定在jobmanager里面显示的名字
-Dyarn.application.name=FlinkRetention \    // 指定在yarn上的application的名字
-c com.bigdata.etl.FlinkRetention   // 入口类
 /data/bigdata/flink-dw/target/flink-dw.jar \   //  自己任务的jar包
--consumer.bootstrap.servers ${consumer_bootstrap_servers} \  需要传入的参数
--producer.bootstrap.servers ${producer_bootstrap_servers} \
--retentionGroupId ${retentionGroupId} \
--flinkSeconds ${flinkSeconds} \
--redis.ip ${redis_ip} \
--redis.port ${redis_port} \
--redis.password ${redis_password}

Session-Cluster

先启动一个集群,再将任务提交到上面

Per-Job-Cluster 与 Session-Cluster 区别

  • Per-Job-Cluster : 1.多少个job就有多少个集群
  • Session-Cluster : 1.会首先在flink上启一个集群,所有的job都交给一个集群中。 2.各个应用之间共享资源。 3.适用于小任务量使用,大任务不太适用。
  • 工作中推荐使用哪一种? 官方推挤,大家推荐 使用Per-Job-Cluster的方式。 Session-Cluster 是资源共享,job太多,可能会出现一些问题,但是Session-Cluster也并非一无是处,若job类型一致就可以放到一个集群中。
  • 启动
代码语言:javascript
复制
bin/yarn-session.sh -d

-d 以后台的方式启动

  • 启动日志
代码语言:javascript
复制
luster has been started in detached mode. In order to stop Flink gracefully, use the following command:
$ echo "stop" | ./bin/yarn-session.sh -id application_1628336315656_0001
If this should not be possible, then you can also kill Flink via YARN's web interface or via:
$ yarn application -kill application_1628336315656_0001
Note that killing Flink might not clean up all job artifacts and temporary files.
  • 日志说明
代码语言:javascript
复制
以 后台的方式启动:luster has been started in detached mode.

为了优雅地停止Flink,请使用以下命令: $ echo "stop" | ./bin/yarn-session.sh -id application_1628336315656_0001

如果这是不可能的,那么你也可以通过YARN的web界面或通过以下方式杀死Flink:
$ yarn application -kill application_1628336315656_0001

注意,杀死Flink可能不会清除所有作业工件和临时文件。
  • yarn

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E3DlQffd-1650721637670)(//upload-images.jianshu.io/upload_images/17367901-bd661d45fb94c466.png)]

Application

应用模式,1.11 之后次才有的,类比 Per-Job-Cluster,也是独享集群。

Per-Job-Cluster 与 Application 区别

就在于解析代码的位置,

  • Per-Job-Cluster: 解析main方法是在提交节点的本地。
  • Application : 解析main方法是在Master。

高可用配置

  • 更改yarn配置 在yarn-site.xml中配置
代码语言:javascript
复制
    <!-- 支持 flink yarn高可用 -->
    <property>
        <name>yarn.resourcemanager.am.max-attempts</name>
        <!-- 最大重试次数, 默认值为2 -->
        <value>4</value>
        <description>
                The maximum number of application master execution attempts.
        </description>
    </property>
  • 更改flink配置 在flink-conf.yaml中配置
代码语言:javascript
复制
yarn.application-attempts: 3
high-availability: zookeeper
high-availability.storageDir: hdfs://hadoop162:8020/flink/yarn/ha
high-availability.zookeeper.quorum: hadoop162:2181,hadoop163:2181,hadoop164:2181
high-availability.zookeeper.path.root: /flink-yarn

如果 yarn的application,如果开启了高可用,所有的jobid都会变成0000000000000000000;官方的解释,application模式不允许记录多个状态,这样jobid就不好给了。

yarn的session集群,如果开启了高可用,-D应用id参数=appId,指定提交到session集群,可能会有问题。解决方式 :不写 -t yarn-session

Yarn 高可用和Standalone 高可用区别

  • Standalone 高可用 同时启动多个JobManger,若执行任务的JobManger挂了,其他JobManager马上补上。
  • Yarn 高可用 JobManager只有一个,但是配置了重试次数,挂了之后再重启(重启一个新的ApplicationManager),利用了ApplicationMaster的重启机制。若在一定时间范围内重试都启动不了,那么就真的挂了。如30秒内重试3次,若重启成功,则重试次数清零。

查看flink提交任务

代码语言:javascript
复制
# 查看flink提交任务
./bin/flink list

# 查看最近取消的是哪个命令
./bin/flink list -a

参考:https://blog.51cto.com/u_15318160/3248006 https://blog.csdn.net/chanyue123/article/details/110442617 https://www.jianshu.com/p/861bc86465c9 https://www.jianshu.com/p/10f0d4ff50be https://blog.csdn.net/qq_27474277/article/details/116663318

下一篇
举报
领券