Flink提供了丰富的客户端操作来提交任务,本文在Restful方式上提供扩展,其余四种方式可观看flink-china系列教程-客户端操作的具体分享,传送门:https://www.bilibili.com/video/av47600600/
Flink从1.7版本开始支持RESTClient提交任务,RESTClient可能很多人不熟悉。使用RESTClient提交任务,换句话说就是Flink可支持API调用方式,提交你编写好的Flink代码,提交到Flink集群运行。本文演示读取kafka数据,使用Springcloud微服务框架调用启动,下面是具体实现步骤。
编写Flink程序
新建Springboot父工程,新建flink-service模块。flink-service新建以下代码,然后打包项目。打包项目的目的是把flink任务执行需要用到jar包,在使用RESTClient提交任务是,一起提交到集群运行。
public class ReadFromsinkKafka { public static void main(String[] args) throws Exception { // create execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.11.12:9092"); properties.setProperty("group.id", "flinksink"); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer("flinktest", new SimpleStringSchema(), properties)); stream.map(new MapFunction<String, String>() { private static final long serialVersionUID = -6867736771747690202L; @Override public String map(String value) throws Exception { System.out.println(value); return value; } }).print(); env.execute(); }
}
打包代码分享如下,在pom中删除springcloud原生打包方式spring-boot-maven-plugin,改为以下代码。按这个方式打包完成后,会得到flink-service-1.0-SNAPSHOT-kafka.jar和flink-service-1.0-SNAPSHOT.jar两个jar包,flink-service-1.0-SNAPSHOT-kafka.jar是你所编写Flink代码,flink-service-1.0-SNAPSHOT-kafka.jar是执行你的Flink程序需要用到的kafka base和client等jar包。
<build> <plugins> <!-- get default data from flink-examples-batch package --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>2.9</version> <executions> <execution> <id>unpack</id> <phase>prepare-package</phase> <goals> <goal>unpack</goal> </goals> <configuration> <artifactItems> <!-- for kafka connector--> <artifactItem> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> <type>jar</type> <overWrite>false</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <includes>org/apache/flink/**</includes> </artifactItem>
<!-- for kafka base --> <artifactItem> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-base_${scala.binary.version}</artifactId> <version>${flink.version}</version> <type>jar</type> <overWrite>false</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <includes>org/apache/flink/**</includes> </artifactItem> <!-- for kafka client --> <artifactItem> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.1</version> <type>jar</type> <overWrite>false</overWrite> <outputDirectory>${project.build.directory}/classes</outputDirectory> <includes>org/apache/**</includes> </artifactItem> </artifactItems> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <executions> <!-- kafka --> <execution> <id>kafka</id> <phase>package</phase> <goals> <goal>jar</goal> </goals> <configuration> <classifier>kafka</classifier> <archive> <manifestEntries> <program-class>com.flink.kafka.ReadFromsinkKafka</program-class> </manifestEntries> </archive> <includes> <include>**/com/flink/kafka/ReadFromsinkKafka.class</include> </includes> </configuration> </execution> </executions> </plugin> </plugins></build>
Restful调用功能实现
新建controller,加入如下代码。本文实现RESTClient提交Flink任务的关键在于,通过createRemoteEnvironment 方法连接到远程Flink环境,拿到Flink执行环境环境后,执行env.execute()就可以提交任务至远程服务器环境执行。
@RestController@RequestMapping("flink")public class FlinkController { @RequestMapping(value="/test",method= RequestMethod.POST) public void test() throws Exception { String[] jars = {"flink-service/target/flink-service-1.0-SNAPSHOT-kafka.jar", "flink-service/target/flink-service-1.0-SNAPSHOT.jar"}; StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.11.11",8081,2,jars); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.11.12:9092"); properties.setProperty("group.id", "flinksink"); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer("flinktest", new SimpleStringSchema(), properties)); stream.map(new MapFunction<String, String>() { private static final long serialVersionUID = -6867736771747690202L; @Override public String map(String value) throws Exception { System.out.println(value); return value; } }).print(); env.execute(); }}
提交测试
经本人验证,此方法提交到Flink standalone集群和yarn集群都可以运行,以下是运行成功图片。
向kafka中写入数据,可以在Flink日志中查看到数据,如下图