当Flink遇到Kafka-FlinkKafkaConsumer使用详解

Flink是新一代的流处理计算引擎。通过轻量级的checkpoint,Flink可以在高吞吐量的情况下保证exactly-once(这需要数据源能够提供回溯消费的能力)。Flink支持众多的source(从中读取数据)和sink(向其写入数据),列表如下:

Kafka作为目前非常流行的消息中间件,它不仅能够提供极大的吞吐量,还能够配合Flink在消费端达到exactly-once。

本文将详细介绍如何配置Flink读取Kafka,运行机制和exactly-once是如何保证的,最后,还会给出监控Flink消费Kafka的方案。(注: 本文的使用的是Flink 1.3.1-release和 Kafka 0.8)

Flink 是通过Connector与具体的source 和 sink进行通信的,具体到Kafka 0.8,相应的Connector是 FlinkKafkaConsumer08和FlinkKafkaProducer08。

我们首先介绍FlinkKafkaConsumer08的配置:

一、 Kafka Consumer的配置

FlinkKafkaConsumer08可以消费一个或多个Kafka topic的数据,它的构造器需要接收以下参数:

1. topic名或 topic名的列表

2. 反序列化约束,以便于Flink决定如何反序列化从Kafka获得的数据

3. Kafka consumer的属性配置,下面两个属性配置是必须的:

· “zookeeper.connect” (Zookeeper servers的地址列表,以逗号分隔)

· “group.id” (consumer group)

· “bootstrap.servers” (Kafka brokers的地址列表,以逗号分隔)

示例代码:

以下几个参数是需要我们重点关注的。

(一) 反序列化shema

Flink Kafka Consumer 需要知道如何将来自Kafka的二进制数据转换为Java/Scala对象。DeserializationSchema接口允许程序员指定这个序列化的实现。该接口的 T deserialize(byte[]message) 会在收到每一条Kafka的消息的时候被调用。

我们通常会实现 AbstractDeserializationSchema,它可以描述被序列化的Java/Scala类型到Flink的类型(TypeInformation)的映射。如果用户的代码实现了DeserializationSchema,那么就需要自己实现getProducedType(...) 方法。

为了方便使用,Flink提供了一些已实现的schema:

1. TypeInformationSerializationSchema (andTypeInformationKeyValueSerializationSchema) ,他们会基于Flink的TypeInformation来创建schema。这对于那些从Flink写入,又从Flink读出的数据是很有用的。这种Flink-specific的反序列化会比其他通用的序列化方式带来更高的性能。

2. JsonDeserializationSchema (andJSONKeyValueDeserializationSchema) 可以把序列化后的Json反序列化成ObjectNode,ObjectNode可以通过objectNode.get(“field”).as(Int/String/…)() 来访问指定的字段。

3. SimpleStringSchema可以将消息反序列化为字符串。当我们接收到消息并且反序列化失败的时候,会出现以下两种情况: 1) Flink从deserialize(..)方法中抛出异常,这会导致job的失败,然后job会重启;2) 在deserialize(..) 方法出现失败的时候返回null,这会让Flink Kafka consumer默默的忽略这条消息。请注意,如果配置了checkpoint 为enable,由于consumer的失败容忍机制,失败的消息会被继续消费,因此还会继续失败,这就会导致job被不断自动重启。

(二) Kafka Consumers 起始offset配置

FlinkKafkaConsumer 允许我们配置Kafka partition被消费的offset的起始位,示例代码如下:

所有版本的Flink KafkaConsumer都支持以上的配置,下面对这些配置进行详细的说明:

setStartFromGroupOffsets(默认):采用consumer group的offset来作为起始位,这个offset从Kafka brokers(0.9以上版本) 或 Zookeeper(Kafka 0.8)中获取。如果从Kafka brokers或者Zookeeper中找不到这个consumer group对应的partition的offset,那么auto.offset.reset这个配置就会被启用。

setStartFromEarliest() /setStartFromLatest(): 即从最早的/最新的消息开始消费。

当然,也可以指定具体的某个offset作为某个partition的起始消费位置:

上述的代码配置了myTopic的partition 0,1,2在被Flink job消费的起始位置。假设myTopic总共有5个partition,那么剩下的两个partition没有被配置具体的offset的起始位,所以Flink会对这两个partition的采用默认的offset起始位的配置(setStartFromGroupOffsets)。

注意,如果你在这个job中配置了enableCheckpointing() 或者从某个savepoint来启动这个job,那么起始位会优先从savepoint或者checkpoint中获取。

(三) 容错机制

当Flink的job开启了checkpoint的时候,Flink会一边消费topic的数据,一边定时的将offset和其他operator的状态记录到checkpoint中。如果遇到了job失败的情况,那么Flink将会重启job,从最后一个checkpoint中来恢复job的所有状态,然后从checkpoint中记录的offset开始重新对Kafka 的topic进行消费。记录offset的间隔决定了程序在失败的情况下需要回溯的最大程度。

为了使用Flink Kafkaconsumer的容错机制,我们需要在程序中作如下的配置:

还有一点需要注意的是,Flink只有在task slot的数量足够的情况下才可以成功的重启job,所以如果job是因为TaskManager down掉(或者无法连接到集群)导致task slot不足而失败,那么必须要恢复增加足够的task slot才能让job重启。而Flink on YARN 支持自动的重启丢失的YARN containers。

(四) offset提交行为的配置

Flink KafkaConsumer允许配置向 Kafka brokers(或者向Zookeeper)提交offset的行为。需要注意的是,Flink Kafka Consumer并不依赖于这些提交回Kafka或Zookeeper的offset来保证容错。这些被提交的offset只是意味着Flink将消费的状态暴露在外以便于监控。

二、Flink Kafka Consumer的运行机制

上图简要概括了FlinkKafkaConsumer08的运行机制。每个消费Kafka source的operator的subTask线程都持有一个FlinkKafkaConsumer08实例,这个实例负责分配这个subTask线程消费的topic的具体的partition,以及从checkpoint中恢复partition应该消费的起始offset。

Kafka08Fetcher负责和Kafka brokers通信,获取具体各个partition的leader,每个FlinkKafkaConsumer08都拥有一个Kafka08Fetcher。每个Kafka08Fetcher拥有一个或多个SimpleConsumerThread,SimpleConsumerThread负责从partition的leader中拉取数据,并将其反序列化,最后发送给下一级的operator,注意在SimpleConsumerThread中使用的是Kafka的低级API,这是因为它需要灵活的控制从某个具体的offset进行消费。

(一) 生命周期

下面,我们罗列出FlinkKafkaConsumer08、Kafka08Fetcher和SimpleConsumerThread生命周期的几个关键点(关于每个Flink StreamTask的生命周期,可以参考:https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/task_lifecycle.html),以弄清楚FlinkKafkaConsumer08是如何恢复offset 以及保证exactly-once的。

1.FlinkKafkaConsumer08:

1) initializeState():从最后一个成功的checkpoint中获取各个partition的offset到restoredState中。

2) open():从restoredState中获取这个subTask所消费的topic的partition的起始offset,保存到subscribedPartitionsToStartOffsets中;如果这是一个第一次向topic消费的job的subTask,那么Flink根据job的并行度以及这个subTask的index均匀的分配partition给这个subTask消费。此时,partition的起始offset就由我们在上文中介绍的配置来决定。

3) run(): 如果subscribedPartitionsToStartOffsets不为空,创建Kafka08Fetcher,执行其runFetchLoop()。

4) close(): job被cancel或者出现了异常,那么调用close()方法,close方法会调用Fetcher的cancel()方法,FlinkKafkaConsumer08所在的subTask结束。

2. Kafka08Fetcher:

Kafka08Fetcher从FlinkKafkaConsumer08中获取它要消费的partition加入到它的unassignedPartitionsQueue中。然后在它的runFetchLoop()中,对这个Queue中的partition进行消费。下面主要罗列runFetchLoop()中的主要细节。

1) 首先创建brokerToThread:Map>,这个map会把broker和连接到这个broker的线程映射起来。创建zookeeperOffsetHandler,根据配置的起始的offset行为从zookeeper或Kafka中获取没有从checkpoint中恢复的partition的offset。然后创建PeriodicOffsetCommitter线程周期性的向Zookeeper提交offset。

2) 向Flink内部(JMX)注册MetricGroup,名为"KafkaConsumer",我们可以利用这个MetricGroup来对FlinkKafakConsumer08的消费状况进行监控。

3) while(running) 循环:获取unassignedPartitionsQueue中应该被消费的partition,通过低级api寻找他们的Leader。然后从brokerToThread中寻找这个leader对应的SimpleConsumerThread是否被创建了,如果没有被创建,那么创建并运行一个SimpleConsumerThread,并更新brokerToThread;如果已经被创建,并且没有新的partition加入,那么重复这个while循环。否则,终止这个leader对应的SimpleConsumerThread,并创建新的SimpleConsumerThread然后继续消费。

4) cancel():如果发生了异常或者job被手动cancel,关闭zookeeperOffsetHandler、periodicCommitter和brokerToThread中的所有线程。

3.SimpleConsumerThread:

1) 创建SimpleConsumer(Kafka低级api),然后再一次根据配置的起始offset行为去Kafka中获取没有从checkpoint和Zookeeper中恢复的partition的offset。

2) while(running) 循环,我们跳过errorhandle,直接展示最核心的代码:

其中valueBytes和keyBytes均为从kafka中读取到的消息,他们经过反序列化后交给owner发送消息给下一级的operator。

这里的owner就是Kafka08Fetcher,它除了会将消息发送给下一级的operator外,还会记录这个partition的state(即offset),Flink异步的将这个state记录到checkpoint中。

这里需要注意的是,Flink异步记录checkpoint的行为是由我们的来配置的,只有当我们设置了enableCheckpointing()时,Flink才会在checkpoint完成时(整个job的所有的operator都收到了这个checkpoint的barrier才意味这checkpoint完成,具体参考我们对Flink checkpoint的介绍)将offset记录起来并提交,这时候才能够保证exactly-once。

3) 如果线程被终止,那么关闭SimpleConsumer。

(二) 容错机制

在发生错误的情况下,Flink会如何处理呢?在finally块中记录最后消费到的offset再向JobManager提交checkpoint吗?在通常情况下,比如发生了手动cancel或者userCode的异常时,这么做没有问题。可是如果是因为其他原因(如Full GC)使得TaskManagerhung住了,甚至是机器挂了,那么这个时候就不能通过finally 块来保证exactly-once了。Flink依赖的是带barrier的checkpointing机制来解决容错的问题。

我们通过下面一副图来简述这种机制:

barrier可以理解为checkpoint之间的分隔符,在它之前的data属于前一个checkpoint,而在它之后的data属于另一个checkpoint。同时,barrier会由source(如FlinkKafkaConsumer)发起,并混在数据中,同数据一样传输给下一级的operator,直到sink为止。假设我们的Streaming Job只有一个source、一个map operator 以及一个sink,属于barrier所分隔的checkpoint的数据已经被处理完毕并sink,而barrier还处于source和map operator之间,barrier还处于map和sink之间。由于barrier已经被sink收到,那么说明checkpoint已经完成了(这个checkpoint的状态为completed并被存到了state backend中),它之前的数据已经被处理完毕并sink。

但是由于sink还没有收到barrier,那么所有之前之后的数据都会被缓存在sink的Input Buffer中,也就是说这部分数据虽然已经经过source消费并经过map处理了,但是还是没有写入目的地。所以如果Job在这个时候失败了,最后一个成功committed的checkpoint是checkpoint,所以FlinkKafkaConsumer从checkpoint中恢复出相应的partitionoffset就可以了。

我们注意到,虽然之后的部分数据和之后的所有数据虽然已经被source消费,但是都没有被sink,这部分数据会被FlinkKafkaConsumer“重复”消费,我们并没有丢失任何的数据也没有重复写入任何数据,保证了exactly-once。

小节:

1. 在配置了checkpointingenable的情况下,FlinkKafkaConsumer08在开始消费数据之前,会优先从checkpoint中恢复出被消费的partition的offset,如果没有从checkpoint中恢复某些partition的offset,它会从Zookeeper中恢复,若从Zookeeper中仍然没有恢复,它会根据配置的offset起始行为来配置起始offset。

2.FlinkKafkaConsumer08通过Kafka的低级API和Flink带barrier的轻量级checkpoint机制保证了在高吞吐量的情况下的exactly-once。

三、监控

我们在前文提到,Flink会定时的将offset提交到Zookeeper中,但是提交到Zookeeper的offset并不是实时的offset。官方更为推荐从Flink注册的Metric来监控Flink的消费情况。

上面的两幅图片显示了analytics_package_standard这个kafka topic的 partition-0在Flink中的当前offset,这个offset我们可以通过jconsole、jvisualvm等工具查看,也可以直接通过Flink TaskManager开启的JMX 端口获取。如果我们要监控消费淤积,再从Kafka中获取相应partition的latestoffset即可。我们可以在配置文件flink-conf.yaml中配置Flink的JMX端口:

Flink 1.3中,还增加了许多有用的监控,比如总的消息条数,消息的瞬时读取速度,latency等等,我们可以在Flink的Web UI的Task Metric中查看这些监控指标:

结论

FlinkKafkaConsumer提供了一套健壮的机制保证了在高吞吐量的情况下exactly-once的消费Kafka的数据,它的API的使用与配置也比较简单,同时也便于监控。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180706B15PNS00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码关注腾讯云开发者

领取腾讯云代金券