flink与kafka整合是很常见的一种实时处理场景,尤其是kafka 0.11版本以后生产者支持了事务,使得flink与kafka整合能实现完整的端到端的仅一次处理,虽然这样会有checkpoint周期的数据延迟,但是这个仅一次处理也是很诱人的。可见的端到端的使用案例估计就是前段时间oppo的案例分享吧。关注浪尖微信公众号(bigdatatip)输入 oppo 即可获得。
1.flink sql与kafka整合方式介绍
flink SQL与kafka整合有多种方式,浪尖就在这里总结一下:
1.datastream转table
通过addsource和addsink API,整合,生成Datastream后注册为表,然后sql分析。
主要接口有两种形式
1.直接注册为表// register the DataStream as Table "myTable" with fields "f0", "f1"tableEnv.registerDataStream("myTable", stream);
// register the DataStream as table "myTable2" with fields "myLong", "myString"tableEnv.registerDataStream("myTable2", stream, "myLong, myString");
2.转换为tableDataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
2.tablesource和tablesink
通过tablesource和tablesink接口,也可以直接注册为输入和输出表。
Kafka010JsonTableSource和Kafka010JsonTableSink
3.自定义catalog
通过自定义catalog的形式,这种类型暂时不讲后面会有视频教程放到知识星球里。
ExternalCatalog catalog = new InMemoryExternalCatalog();
// register the ExternalCatalog catalogtableEnv.registerExternalCatalog("InMemCatalog", catalog);
4.connector方式
这种方式是本文要讲明白的一种方式,其余的会陆续分享到知识星球内部。
这种方式目前仅仅支持kafka,es,和file。
2.案例讲解
直接上案例吧,然后再去讲一下细节问题。
import org.apache.flink.api.common.typeinfo.Types;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Json;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Rowtime;import org.apache.flink.table.descriptors.Schema;public class kafka2kafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("jsontest") .property("bootstrap.servers", "mt-mdh.local:9093") .property("group.id","test") .startFromLatest() ) .withFormat( new Json() .failOnMissingField(false) .deriveSchema() ) .withSchema( new Schema() .field("rowtime",Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("eventtime") .watermarksPeriodicBounded(2000) ) .field("fruit", Types.STRING) .field("number", Types.INT) ) .inAppendMode() .registerTableSource("source"); tEnv.connect( new Kafka() .version("0.10") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("test") .property("acks", "all") .property("retries", "0") .property("batch.size", "16384") .property("linger.ms", "10") .property("bootstrap.servers", "mt-mdh.local:9093") .sinkPartitionerFixed() ).inAppendMode() .withFormat( new Json().deriveSchema() ) .withSchema( new Schema() .field("fruit", Types.STRING) .field("total", Types.INT) .field("time", Types.SQL_TIMESTAMP) ) .registerTableSink("sink"); tEnv.sqlUpdate("insert into sink select fruit,sum(number),TUMBLE_END(rowtime, INTERVAL '5' SECOND) from source group by fruit,TUMBLE(rowtime, INTERVAL '5' SECOND)"); env.execute(); }}
这个例子是按照事件时间开窗,统计对fruit求和。从这个例子里可以看到要使用connector还是比较麻烦的,配置项目比较多,下面我们就拆分介绍一下。细节内容可以阅读官网(https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/connect.html#type-strings)
1.配置数据源
.connect( new Kafka() .version("0.11") // required: valid connector versions are // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("...") // required: topic name from which the table is read
// optional: connector specific properties .property("zookeeper.connect", "localhost:2181") .property("bootstrap.servers", "localhost:9092") .property("group.id", "testGroup")
// optional: select a startup mode for Kafka offsets .startFromEarliest() .startFromLatest() .startFromSpecificOffsets(...)
// optional: output partitioning from Flink's partitions into Kafka's partitions .sinkPartitionerFixed() // each Flink partition ends up in at-most one Kafka partition (default) .sinkPartitionerRoundRobin() // a Flink partition is distributed to Kafka partitions round-robin .sinkPartitionerCustom(MyCustom.class) // use a custom FlinkKafkaPartitioner subclass)
2.数据的格式
目前支持CSV,JSON,AVRO三种格式。从json数据源里解析所需要的table字段,这个过程需要我们指定。总共有三种方式,如下:
.withFormat( new Json() .failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
// required: define the schema either by using type information which parses numbers to corresponding types .schema(Type.ROW(...))
// or by using a JSON schema which parses to DECIMAL and TIMESTAMP .jsonSchema( "{" + " type: 'object'," + " properties: {" + " lon: {" + " type: 'number'" + " }," + " rideTime: {" + " type: 'string'," + " format: 'date-time'" + " }" + " }" + "}" )
// or use the table's schema .deriveSchema())
其实,最常用的是第三种,直接从我们指定的schema里逆推。
3.schema信息
除了配置schema信息之外,还可以配置时间相关的概念。
.withSchema( new Schema() .field("MyField1", Types.SQL_TIMESTAMP) .proctime() // optional: declares this field as a processing-time attribute .field("MyField2", Types.SQL_TIMESTAMP) .rowtime(...) // optional: declares this field as a event-time attribute .field("MyField3", Types.BOOLEAN) .from("mf3") // optional: original field in the input that is referenced/aliased by this field)
4.输出的更新模式
更新模式有append模式,retract模式,update模式。
.connect(...)
.inAppendMode() // otherwise: inUpsertMode() or inRetractMode()
5.时间相关配置
在配置schema信息的时候可以配置时间相关的概念,比如事件时间,处理时间,还可以配置watermark相关的,甚至是自定义watermark。
对于事件时间,时间戳抽取支持:
// Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute..rowtime( new Rowtime() .timestampsFromField("ts_field") // required: original field name in the input)
// Converts the assigned timestamps from a DataStream API record into the rowtime attribute// and thus preserves the assigned timestamps from the source.// This requires a source that assigns timestamps (e.g., Kafka 0.10+)..rowtime( new Rowtime() .timestampsFromSource())
// Sets a custom timestamp extractor to be used for the rowtime attribute.// The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`..rowtime( new Rowtime() .timestampsFromExtractor(...))
watermark生成策略支持
// Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum// observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp// are not late..rowtime( new Rowtime() .watermarksPeriodicAscending())
// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.// Emits watermarks which are the maximum observed timestamp minus the specified delay..rowtime( new Rowtime() .watermarksPeriodicBounded(2000) // delay in milliseconds)
// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the// underlying DataStream API and thus preserves the assigned watermarks from the source..rowtime( new Rowtime() .watermarksFromSource())
3.总结
本文主要讲了flink sql与kafka结合的多种方式,对于datastream相关操作可以一般采用addsource和addsink的方式,对于想使用flink的朋友们,kafkajsontablesource和kafkajsontablesink在逐步废弃掉,可以采用connector和catalog的形式,尤其是后者在实现平台的过程中也是非常之靠谱好用的。
更多flink内容,欢迎加入浪尖知识星球,与750+好友一起学习。