FlinkKafkaConsumer08可以消费一个或多个Kafka topic的数据,它的构造器需要接收以下参数:
1. topic名或 topic名的列表
2. 反序列化约束,以便于Flink决定如何反序列化从Kafka获得的数据
3. Kafka consumer的属性配置,下面两个属性配置是必须的:
示例代码:
以下几个参数是需要我们重点关注的。
1
反序列化shema
Flink Kafka Consumer 需要知道如何将来自Kafka的二进制数据转换为Java/Scala对象。DeserializationSchema接口允许程序员指定这个序列化的实现。该接口的 T deserialize(byte[]message) 会在收到每一条Kafka的消息的时候被调用。
我们通常会实现 AbstractDeserializationSchema,它可以描述被序列化的Java/Scala类型到Flink的类型(TypeInformation)的映射。如果用户的代码实现了DeserializationSchema,那么就需要自己实现getProducedType(...) 方法。
为了方便使用,Flink提供了一些已实现的schema:
1. TypeInformationSerializationSchema (andTypeInformationKeyValueSerializationSchema) ,他们会基于Flink的TypeInformation来创建schema。这对于那些从Flink写入,又从Flink读出的数据是很有用的。这种Flink-specific的反序列化会比其他通用的序列化方式带来更高的性能。
2. JsonDeserializationSchema (andJSONKeyValueDeserializationSchema) 可以把序列化后的Json反序列化成ObjectNode,ObjectNode可以通过objectNode.get(“field”).as(Int/String/…)() 来访问指定的字段。
3. SimpleStringSchema可以将消息反序列化为字符串。当我们接收到消息并且反序列化失败的时候,会出现以下两种情况: 1) Flink从deserialize(..)方法中抛出异常,这会导致job的失败,然后job会重启;2) 在deserialize(..) 方法出现失败的时候返回null,这会让Flink Kafka consumer默默的忽略这条消息。请注意,如果配置了checkpoint 为enable,由于consumer的失败容忍机制,失败的消息会被继续消费,因此还会继续失败,这就会导致job被不断自动重启。
2
Kafka Consumers 起始offset配置
FlinkKafkaConsumer 允许我们配置Kafka partition被消费的offset的起始位,示例代码如下:
所有版本的Flink KafkaConsumer都支持以上的配置,下面对这些配置进行详细的说明:
当然,也可以指定具体的某个offset作为某个partition的起始消费位置:
上述的代码配置了myTopic的partition 0,1,2在被Flink job消费的起始位置。假设myTopic总共有5个partition,那么剩下的两个partition没有被配置具体的offset的起始位,所以Flink会对这两个partition的采用默认的offset起始位的配置(setStartFromGroupOffsets)。
注意,如果你在这个job中配置了enableCheckpointing() 或者从某个savepoint来启动这个job,那么起始位会优先从savepoint或者checkpoint中获取。
3
容错机制
当Flink的job开启了checkpoint的时候,Flink会一边消费topic的数据,一边定时的将offset和其他operator的状态记录到checkpoint中。如果遇到了job失败的情况,那么Flink将会重启job,从最后一个checkpoint中来恢复job的所有状态,然后从checkpoint中记录的offset开始重新对Kafka 的topic进行消费。记录offset的间隔决定了程序在失败的情况下需要回溯的最大程度。
为了使用Flink Kafkaconsumer的容错机制,我们需要在程序中作如下的配置:
还有一点需要注意的是,Flink只有在task slot的数量足够的情况下才可以成功的重启job,所以如果job是因为TaskManager down掉(或者无法连接到集群)导致task slot不足而失败,那么必须要恢复增加足够的task slot才能让job重启。而Flink on YARN 支持自动的重启丢失的YARN containers。
4
offset提交行为的配置
Flink KafkaConsumer允许配置向 Kafka brokers(或者向Zookeeper)提交offset的行为。需要注意的是,Flink Kafka Consumer并不依赖于这些提交回Kafka或Zookeeper的offset来保证容错。这些被提交的offset只是意味着Flink将消费的状态暴露在外以便于监控。