前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用RESTClient提交Flink任务详解

使用RESTClient提交Flink任务详解

作者头像
阿泽
发布2019-06-21 21:04:11
3.8K0
发布2019-06-21 21:04:11
举报

Flink提供了丰富的客户端操作来提交任务,本文在Restful方式上提供扩展,其余四种方式可观看flink-china系列教程-客户端操作的具体分享,传送门:https://www.bilibili.com/video/av47600600/

Flink从1.7版本开始支持RESTClient提交任务,RESTClient可能很多人不熟悉。使用RESTClient提交任务,换句话说就是Flink可支持API调用方式,提交你编写好的Flink代码,提交到Flink集群运行。本文演示读取kafka数据,使用Springcloud微服务框架调用启动,下面是具体实现步骤。

编写Flink程序

新建Springboot父工程,新建flink-service模块。flink-service新建以下代码,然后打包项目。打包项目的目的是把flink任务执行需要用到jar包,在使用RESTClient提交任务是,一起提交到集群运行。

代码语言:javascript
复制
public class ReadFromsinkKafka {  public static void main(String[] args) throws Exception {    // create execution environment    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    Properties properties = new Properties();    properties.setProperty("bootstrap.servers", "192.168.11.12:9092");    properties.setProperty("group.id", "flinksink");    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer("flinktest",         new SimpleStringSchema(), properties));    stream.map(new MapFunction<String, String>() {      private static final long serialVersionUID = -6867736771747690202L;      @Override      public String map(String value) throws Exception {        System.out.println(value);        return value;      }    }).print();    env.execute();  }
}

打包代码分享如下,在pom中删除springcloud原生打包方式spring-boot-maven-plugin,改为以下代码。按这个方式打包完成后,会得到flink-service-1.0-SNAPSHOT-kafka.jar和flink-service-1.0-SNAPSHOT.jar两个jar包,flink-service-1.0-SNAPSHOT-kafka.jar是你所编写Flink代码,flink-service-1.0-SNAPSHOT-kafka.jar是执行你的Flink程序需要用到的kafka base和client等jar包。

代码语言:javascript
复制
<build>  <plugins>  <!-- get default data from flink-examples-batch package -->  <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-dependency-plugin</artifactId>  <version>2.9</version>  <executions>    <execution>    <id>unpack</id>    <phase>prepare-package</phase>    <goals>    <goal>unpack</goal>    </goals>    <configuration>    <artifactItems>    <!-- for kafka connector-->    <artifactItem>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>    <version>${flink.version}</version>    <type>jar</type>    <overWrite>false</overWrite>    <outputDirectory>${project.build.directory}/classes</outputDirectory>    <includes>org/apache/flink/**</includes>    </artifactItem>
    <!-- for kafka base -->    <artifactItem>    <groupId>org.apache.flink</groupId>    <artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId>    <version>${flink.version}</version>    <type>jar</type>    <overWrite>false</overWrite>    <outputDirectory>${project.build.directory}/classes</outputDirectory>    <includes>org/apache/flink/**</includes>    </artifactItem>    <!-- for kafka client -->    <artifactItem>    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>2.0.1</version>    <type>jar</type>    <overWrite>false</overWrite>    <outputDirectory>${project.build.directory}/classes</outputDirectory>    <includes>org/apache/**</includes>    </artifactItem>    </artifactItems>    </configuration>    </execution>  </executions>  </plugin>  <plugin>  <groupId>org.apache.maven.plugins</groupId>  <artifactId>maven-jar-plugin</artifactId>  <executions>    <!-- kafka -->    <execution>    <id>kafka</id>    <phase>package</phase>    <goals>    <goal>jar</goal>    </goals>    <configuration>    <classifier>kafka</classifier>    <archive>    <manifestEntries>    <program-class>com.flink.kafka.ReadFromsinkKafka</program-class>    </manifestEntries>    </archive>    <includes>    <include>**/com/flink/kafka/ReadFromsinkKafka.class</include>    </includes>    </configuration>    </execution>  </executions>  </plugin>  </plugins></build>

Restful调用功能实现

新建controller,加入如下代码。本文实现RESTClient提交Flink任务的关键在于,通过createRemoteEnvironment 方法连接到远程Flink环境,拿到Flink执行环境环境后,执行env.execute()就可以提交任务至远程服务器环境执行。

代码语言:javascript
复制
@RestController@RequestMapping("flink")public class FlinkController   {  @RequestMapping(value="/test",method= RequestMethod.POST)  public void test() throws Exception {    String[] jars = {"flink-service/target/flink-service-1.0-SNAPSHOT-kafka.jar",        "flink-service/target/flink-service-1.0-SNAPSHOT.jar"};    StreamExecutionEnvironment env =         StreamExecutionEnvironment.createRemoteEnvironment("192.168.11.11",8081,2,jars);    Properties properties = new Properties();    properties.setProperty("bootstrap.servers", "192.168.11.12:9092");    properties.setProperty("group.id", "flinksink");    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer("flinktest",        new SimpleStringSchema(), properties));    stream.map(new MapFunction<String, String>() {      private static final long serialVersionUID = -6867736771747690202L;      @Override      public String map(String value) throws Exception      {        System.out.println(value);        return value;      }    }).print();    env.execute();   }}

提交测试

经本人验证,此方法提交到Flink standalone集群和yarn集群都可以运行,以下是运行成功图片。

向kafka中写入数据,可以在Flink日志中查看到数据,如下图

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战应用指南 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档