前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >重要|Flink SQL与kafka整合的那些事儿

重要|Flink SQL与kafka整合的那些事儿

作者头像
Spark学习技巧
发布2019-06-03 15:30:50
3K0
发布2019-06-03 15:30:50
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入 oppo 即可获得。

1.flink sql与kafka整合方式介绍

flink SQL与kafka整合有多种方式,浪尖就在这里总结一下:

1.datastream转table

通过addsource和addsink API,整合,生成Datastream后注册为表,然后sql分析。

主要接口有两种形式

代码语言:javascript
复制
1.直接注册为表// register the DataStream as Table "myTable" with fields "f0", "f1"tableEnv.registerDataStream("myTable", stream);
// register the DataStream as table "myTable2" with fields "myLong", "myString"tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
2.转换为tableDataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");

2.tablesource和tablesink

通过tablesource和tablesink接口,也可以直接注册为输入和输出表。

Kafka010JsonTableSource和Kafka010JsonTableSink

3.自定义catalog

通过自定义catalog的形式,这种类型暂时不讲后面会有视频教程放到知识星球里。

代码语言:javascript
复制
ExternalCatalog catalog = new InMemoryExternalCatalog();
// register the ExternalCatalog catalogtableEnv.registerExternalCatalog("InMemCatalog", catalog);

4.connector方式

这种方式是本文要讲明白的一种方式,其余的会陆续分享到知识星球内部。

这种方式目前仅仅支持kafka,es,和file。

2.案例讲解

直接上案例吧,然后再去讲一下细节问题。

代码语言:javascript
复制
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Schema;public class kafka2kafka {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);        tEnv.connect(                new Kafka()                        .version("0.10")                        //   "0.8", "0.9", "0.10", "0.11", and "universal"                        .topic("jsontest")                        .property("bootstrap.servers", "mt-mdh.local:9093")                        .property("group.id","test")                        .startFromLatest()        )                .withFormat(                        new Json()                                .failOnMissingField(false)                                .deriveSchema()                )                .withSchema(                        new Schema()                                .field("rowtime",Types.SQL_TIMESTAMP)                                .rowtime(new Rowtime()                                        .timestampsFromField("eventtime")                                        .watermarksPeriodicBounded(2000)                                )                                .field("fruit", Types.STRING)                                .field("number", Types.INT)                )                .inAppendMode()                .registerTableSource("source");        tEnv.connect(                new Kafka()                        .version("0.10")                        //   "0.8", "0.9", "0.10", "0.11", and "universal"                        .topic("test")                        .property("acks", "all")                        .property("retries", "0")                        .property("batch.size", "16384")                        .property("linger.ms", "10")                        .property("bootstrap.servers", "mt-mdh.local:9093")                        .sinkPartitionerFixed()        ).inAppendMode()                .withFormat(                        new Json().deriveSchema()                )                .withSchema(                        new Schema()                                .field("fruit", Types.STRING)                                .field("total", Types.INT)                                .field("time", Types.SQL_TIMESTAMP)                )                .registerTableSink("sink");        tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from source group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)");        env.execute();    }}

这个例子是按照事件时间开窗,统计对fruit求和。从这个例子里可以看到要使用connector还是比较麻烦的,配置项目比较多,下面我们就拆分介绍一下。细节内容可以阅读官网(https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#type-strings)

1.配置数据源

代码语言:javascript
复制
.connect(  new Kafka()    .version("0.11")    // required: valid connector versions are                        //   "0.8", "0.9", "0.10", "0.11", and "universal"    .topic("...")       // required: topic name from which the table is read
    // optional: connector specific properties    .property("zookeeper.connect", "localhost:2181")    .property("bootstrap.servers", "localhost:9092")    .property("group.id", "testGroup")
    // optional: select a startup mode for Kafka offsets    .startFromEarliest()    .startFromLatest()    .startFromSpecificOffsets(...)
    // optional: output partitioning from Flink's partitions into Kafka's partitions    .sinkPartitionerFixed()         // each Flink partition ends up in at-most one Kafka partition (default)    .sinkPartitionerRoundRobin()    // a Flink partition is distributed to Kafka partitions round-robin    .sinkPartitionerCustom(MyCustom.class)    // use a custom FlinkKafkaPartitioner subclass)

2.数据的格式

目前支持CSV,JSON,AVRO三种格式。从json数据源里解析所需要的table字段,这个过程需要我们指定。总共有三种方式,如下:

代码语言:javascript
复制
.withFormat(  new Json()    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default
    // required: define the schema either by using type information which parses numbers to corresponding types    .schema(Type.ROW(...))
    // or by using a JSON schema which parses to DECIMAL and TIMESTAMP    .jsonSchema(      "{" +      "  type: 'object'," +      "  properties: {" +      "    lon: {" +      "      type: 'number'" +      "    }," +      "    rideTime: {" +      "      type: 'string'," +      "      format: 'date-time'" +      "    }" +      "  }" +      "}"    )
    // or use the table's schema    .deriveSchema())

其实,最常用的是第三种,直接从我们指定的schema里逆推。

3.schema信息

除了配置schema信息之外,还可以配置时间相关的概念。

代码语言:javascript
复制
.withSchema(  new Schema()    .field("MyField1", Types.SQL_TIMESTAMP)      .proctime()      // optional: declares this field as a processing-time attribute    .field("MyField2", Types.SQL_TIMESTAMP)      .rowtime(...)    // optional: declares this field as a event-time attribute    .field("MyField3", Types.BOOLEAN)      .from("mf3")     // optional: original field in the input that is referenced/aliased by this field)

4.输出的更新模式

更新模式有append模式,retract模式,update模式。

代码语言:javascript
复制
.connect(...)
  .inAppendMode()    // otherwise: inUpsertMode() or inRetractMode()

5.时间相关配置

在配置schema信息的时候可以配置时间相关的概念,比如事件时间,处理时间,还可以配置watermark相关的,甚至是自定义watermark。

对于事件时间,时间戳抽取支持:

代码语言:javascript
复制
// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute..rowtime(  new Rowtime()    .timestampsFromField("ts_field")    // required: original field name in the input)
// Converts the assigned timestamps from a DataStream API record into the rowtime attribute// and thus preserves the assigned timestamps from the source.// This requires a source that assigns timestamps (e.g., Kafka 0.10+)..rowtime(  new Rowtime()    .timestampsFromSource())
// Sets a custom timestamp extractor to be used for the rowtime attribute.// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`..rowtime(  new Rowtime()    .timestampsFromExtractor(...))

watermark生成策略支持

代码语言:javascript
复制
// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum// observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp// are not late..rowtime(  new Rowtime()    .watermarksPeriodicAscending())
// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.// Emits watermarks which are the maximum observed timestamp minus the specified delay..rowtime(  new Rowtime()    .watermarksPeriodicBounded(2000)    // delay in milliseconds)
// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the// underlying DataStream API and thus preserves the assigned watermarks from the source..rowtime(  new Rowtime()    .watermarksFromSource())

3.总结

本文主要讲了flink sql与kafka结合的多种方式,对于datastream相关操作可以一般采用addsource和addsink的方式,对于想使用flink的朋友们,kafkajsontablesource和kafkajsontablesink在逐步废弃掉,可以采用connector和catalog的形式,尤其是后者在实现平台的过程中也是非常之靠谱好用的。

更多flink内容,欢迎加入浪尖知识星球,与750+好友一起学习。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档