前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据Flink进阶(十四):Flink On Standalone任务提交

大数据Flink进阶(十四):Flink On Standalone任务提交

原创
作者头像
Lansonli
发布2023-04-08 21:41:22
1.7K0
发布2023-04-08 21:41:22
举报
文章被收录于专栏:Lansonli技术博客Lansonli技术博客

Flink On Standalone任务提交

Flink On Standalone 即Flink任务运行在Standalone集群中,Standlone集群部署时采用Session模式来构建集群,即:首先构建一个Flink集群,Flink集群资源就固定了,所有提交到该集群的Flink作业都运行在这一个集群中,如果集群中提交的任务多资源不够时,需要手动增加节点,所以Flink 基于Standalone运行任务一般用在开发测试或者企业实时业务较少的场景下。

Flink On Standalone 任务提交支持Session会话模式和Application应用模式,不支持Per-Job单作业模式。下面介绍基于Standalone 的Session会话模式和Application应用模式任务提交命令和原理,演示两类任务提交模式的代码还是以上一章节中读取Socket 数据进行实时WordCount统计代码为例,代码如下:

代码语言:javascript
复制
package com.lanson.flinkjava.code.chapter4;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
 * 读取Socket数据进行实时WordCount统计
 */
public class SocketWordCount {
    public static void main(String[] args) throws Exception {
        //1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.读取Socket数据
        DataStreamSource<String> ds = env.socketTextStream("node3", 9999);
        //3.准备K,V格式数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = ds.flatMap((String line, Collector<Tuple2<String, Integer>> out) -> {
            String[] words = line.split(",");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        //4.聚合打印结果
        tupleDS.keyBy(tp -> tp.f0).sum(1).print();

        //5.execute触发执行
        env.execute();
    }
}

将以上代码进行打包,名称为"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar",并在node3节点上启动socket服务(nc -lk 9999)。

一、Standalone Session模式

1、任务提交命令

在Standalone集群搭建完成后,基于Standalone集群提交Flink任务方式就是使用的Session模式,提交任务之前首先启动Standalone集群($FLINK_HOME/bin/start-cluster.sh),然后再提交任务,Standalone Session模式提交任务命令如下:

代码语言:javascript
复制
[root@node1 ~]# cd /software/flink-1.16.0/bin/
[root@node1 bin]# ./flink run -m node1:8081 -d -c com.lanson.flinkjava.code.chapter4.SocketWordCount /root/flink-jar-test/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

以上提交任务的参数解释如下:

参数

解释

-m

--jobmanager,指定提交任务连接的JobManager地址。

-c

--class,指定运行的class主类。

-d

--detached,任务提交后在后台独立运行,退出客户端,也可不指定。

-p

--parallelism,执行程序的并行度。

以上任务提交完成后,我们可以登录Flink WebUI(https://node1:8081)查看启动一个任务:

再次按照以上命令提交Flink任务可以看到集群中会有2个任务,说明Standalone Session模式下提交的所有Flink任务共享集群资源,如下:

以上提交Flink流任务的名称默认为"Flink Streaming Job",也可以通过参数"pipeline.name"来自定义指定Job 名称,提交命令如下:

代码语言:javascript
复制
./flink run -m node1:8081 -d -Dpipeline.name=socket-wc1 -c com.lanson.flinkjava.code.chapter4.SocketWordCount /root/flink-jar-test/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

提交之后,可以看到页面中有三个任务,最后一个任务提交的名称改成了自定义任务名称。

2、任务提交流程

Standalone Session模式提交任务中首先需要创建Flink集群,集群创建启动的同时Dispatcher、JobMaster、ResourceManager对象一并创建、TaskManager也一并启动,TaskManager会向集群ResourceManager汇报Slot信息,Flink集群资源也就确定了。Standalone Session模式提交任务流程如下:

  1. 在客户端提交Flink任务,客户端会将任务转换成JobGraph提交给JobManager。
  2. Dispatcher将提交任务提交给JobMaster。
  3. JobMaster向ResourceManager申请Slot资源。
  4. ResourceManager会在对应的TaskManager上划分Slot资源。
  5. TaskManager向JobMaster offer Slot资源。
  6. JobMaster将任务对应的task发送到TaskManager上执行。

二、Standalone Application模式

1、任务提交命令

Standalone Application模式中不会预先创建Flink集群,在提交Flink 任务的同时会创建JobManager,启动Flink集群,然后需要手动启动TaskManager连接该Flink集群,启动的TaskManager会根据$FLINK_HOME/conf/flink-conf.yaml配置文件中的"jobmanager.rpc.address"配置找JobManager,所以这里选择在node1节点上提交任务并启动JobManager,方便后续其他节点启动TaskManager后连接该节点。Standalone Appliction模式提交任务步骤和命令如下:

1.1、准备Flink jar包

在node1节点上将Flink 打好的"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar"jar包放在 $FLINK_HOME/lib目录下。

1.2、提交任务,在node1 节点上启动 JobManager

代码语言:javascript
复制
cd /software/flink-1.16.0/bin/

代码语言:javascript
复制
#执行如下命令,启动JobManager 
./standalone-job.sh start --job-classname com.lanson.flinkjava.code.chapter4.SocketWordCount

执行以上命令后会自动从$FLINK_HOME/lib中扫描所有jar包,执行指定的入口类。命令执行后可以访问对应的Flink WebUI:https://node1:8081,可以看到提交的任务,但是由于还没有执行TaskManager任务无法执行。

1.3、启动TaskManager

在node1、node2、node3任意一台节点上启动taskManager,根据$FLINK_HOME/conf/flink-conf.yaml配置文件中"jobmanager.rpc.address"配置项会找到对应node1 JobManager。

代码语言:javascript
复制
#在node1节点上启动TaskManager
[root@node1 ~]# cd /software/flink-1.16.0/bin/
[root@node1 bin]# ./taskmanager.sh start

#在node2节点上启动TaskManager
[root@node2 ~]# cd /software/flink-1.16.0/bin/
[root@node2 bin]# ./taskmanager.sh start

启动两个TaskManager后可以看到Flink WebUI中对应的有2个TaskManager,可以根据自己任务使用资源的情况,手动启动多个TaskManager。

1.4、停止集群

代码语言:javascript
复制
#停止启动的JobManager
[root@node1 bin]# ./standalone-job.sh stop

#停止启动的TaskManager
[root@node1 bin]# ./taskmanager.sh stop
[root@node2 bin]# ./taskmanager.sh stop

我们可以以同样的方式在其他节点上以Standalone Application模式提交先的Flink任务,但是每次提交都是当前提交任务独享集群资源。

2、任务提交流程

Standalone Application模式提交任务中提交任务的同时会启动JobManager创建Flink集群,但是需要手动启动TaskManager,这样提交的任务才能正常运行,如果提交的任务使用资源多,还可以启动多个TaskManager。Standalone Application模式提交任务流程如下:

  1. 在客户端提交Flink任务的同时启动JobManager,客户端会将任务转换成JobGraph提交给JobManager。
  2. Dispatcher会启动JobMaster,Dispatcher将提交任务提交给JobMaster。
  3. JobMaster向ResourceManager申请Slot资源。
  4. 手动启动TaskManager,TaskManager会向ResourceManager注册Slot资源
  5. ResourceManager会在对应的TaskManager上划分Slot资源。
  6. TaskManager向JobMaster offer Slot资源。
  7. JobMaster将任务对应的task发送到TaskManager上执行。

Standalone Application模式任务提交流程和Standalone Session模式类似,两者区别主要是Standalone Session模式中启动Flink集群时JobManager、TaskManager、JobMaster会预先启动;Standalone Application模式中提交任务时同时启动集群JobManager、JobMaster,需要手动启动TaskManager。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ​Flink On Standalone任务提交
    • 一、Standalone Session模式
      • 1、任务提交命令
      • 2、任务提交流程
    • 二、Standalone Application模式
      • 1、任务提交命令
      • 2、任务提交流程
相关产品与服务
大数据处理套件 TBDS
腾讯大数据处理套件(Tencent Big Data Suite,TBDS)依托腾讯多年海量数据处理经验,基于云原生技术和泛 Hadoop 生态开源技术对外提供的可靠、安全、易用的大数据处理平台。 TBDS可在公有云、私有云、非云化环境,根据不同数据处理需求组合合适的存算分析组件,包括 Hive、Spark、HBase、Flink、presto、Iceberg、Alluxio 等,以快速构建企业级数据湖、数据仓库。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档