前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka Consumer的配置

Kafka Consumer的配置

作者头像
加米谷大数据
发布2018-07-25 17:45:35
1.7K0
发布2018-07-25 17:45:35
举报
文章被收录于专栏:加米谷大数据加米谷大数据

FlinkKafkaConsumer08可以消费一个或多个Kafka topic的数据,它的构造器需要接收以下参数:

1. topic名或 topic名的列表

2. 反序列化约束,以便于Flink决定如何反序列化从Kafka获得的数据

3. Kafka consumer的属性配置,下面两个属性配置是必须的:

  • “zookeeper.connect” (Zookeeper servers的地址列表,以逗号分隔)
  • “group.id” (consumer group)
  • “bootstrap.servers” (Kafka brokers的地址列表,以逗号分隔)

示例代码:

以下几个参数是需要我们重点关注的。

1

反序列化shema

Flink Kafka Consumer 需要知道如何将来自Kafka的二进制数据转换为Java/Scala对象。DeserializationSchema接口允许程序员指定这个序列化的实现。该接口的 T deserialize(byte[]message) 会在收到每一条Kafka的消息的时候被调用。

我们通常会实现 AbstractDeserializationSchema,它可以描述被序列化的Java/Scala类型到Flink的类型(TypeInformation)的映射。如果用户的代码实现了DeserializationSchema,那么就需要自己实现getProducedType(...) 方法。

为了方便使用,Flink提供了一些已实现的schema:

1. TypeInformationSerializationSchema (andTypeInformationKeyValueSerializationSchema) ,他们会基于Flink的TypeInformation来创建schema。这对于那些从Flink写入,又从Flink读出的数据是很有用的。这种Flink-specific的反序列化会比其他通用的序列化方式带来更高的性能。

2. JsonDeserializationSchema (andJSONKeyValueDeserializationSchema) 可以把序列化后的Json反序列化成ObjectNode,ObjectNode可以通过objectNode.get(“field”).as(Int/String/…)() 来访问指定的字段。

3. SimpleStringSchema可以将消息反序列化为字符串。当我们接收到消息并且反序列化失败的时候,会出现以下两种情况: 1) Flink从deserialize(..)方法中抛出异常,这会导致job的失败,然后job会重启;2) 在deserialize(..) 方法出现失败的时候返回null,这会让Flink Kafka consumer默默的忽略这条消息。请注意,如果配置了checkpoint 为enable,由于consumer的失败容忍机制,失败的消息会被继续消费,因此还会继续失败,这就会导致job被不断自动重启。

2

Kafka Consumers 起始offset配置

FlinkKafkaConsumer 允许我们配置Kafka partition被消费的offset的起始位,示例代码如下:

所有版本的Flink KafkaConsumer都支持以上的配置,下面对这些配置进行详细的说明:

  • setStartFromGroupOffsets(默认):采用consumer group的offset来作为起始位,这个offset从Kafka brokers(0.9以上版本) 或 Zookeeper(Kafka 0.8)中获取。如果从Kafka brokers或者Zookeeper中找不到这个consumer group对应的partition的offset,那么auto.offset.reset这个配置就会被启用。
  • setStartFromEarliest() /setStartFromLatest(): 即从最早的/最新的消息开始消费。

当然,也可以指定具体的某个offset作为某个partition的起始消费位置:

上述的代码配置了myTopic的partition 0,1,2在被Flink job消费的起始位置。假设myTopic总共有5个partition,那么剩下的两个partition没有被配置具体的offset的起始位,所以Flink会对这两个partition的采用默认的offset起始位的配置(setStartFromGroupOffsets)。

注意,如果你在这个job中配置了enableCheckpointing() 或者从某个savepoint来启动这个job,那么起始位会优先从savepoint或者checkpoint中获取。

3

容错机制

当Flink的job开启了checkpoint的时候,Flink会一边消费topic的数据,一边定时的将offset和其他operator的状态记录到checkpoint中。如果遇到了job失败的情况,那么Flink将会重启job,从最后一个checkpoint中来恢复job的所有状态,然后从checkpoint中记录的offset开始重新对Kafka 的topic进行消费。记录offset的间隔决定了程序在失败的情况下需要回溯的最大程度。

为了使用Flink Kafkaconsumer的容错机制,我们需要在程序中作如下的配置:

还有一点需要注意的是,Flink只有在task slot的数量足够的情况下才可以成功的重启job,所以如果job是因为TaskManager down掉(或者无法连接到集群)导致task slot不足而失败,那么必须要恢复增加足够的task slot才能让job重启。而Flink on YARN 支持自动的重启丢失的YARN containers。

4

offset提交行为的配置

Flink KafkaConsumer允许配置向 Kafka brokers(或者向Zookeeper)提交offset的行为。需要注意的是,Flink Kafka Consumer并不依赖于这些提交回Kafka或Zookeeper的offset来保证容错。这些被提交的offset只是意味着Flink将消费的状态暴露在外以便于监控。

  • Checkpointingdisabled: 此时, Flink Kafka Consumer依赖于它使用的具体的Kafka client的自动定期提交offset的行为,相应的设置是 Kafka properties中的 enable.auto.commit (或者 auto.commit.enable 对于Kafka 0.8) 以及 auto.commit.interval.ms。
  • Checkpointingenabled: 在这种情况下,Flink Kafka Consumer会将offset存到checkpoint中当checkpoint 处于completed的状态时。这保证了在Kafka brokers中的committed offset和checkpointed states中的offset保持一致。通过调用setCommitOffsetOnCheckpoints(boolean)来调整 offset自动提交是否开启(默认情况下是true,即开启自动提交)。请注意,在这种情况下,配置在properties 中的offset的定时自动提交行为将会被忽略。
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-05-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 加米谷大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档