前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >知根知底:Flink-KafkaConsumer 详解

知根知底:Flink-KafkaConsumer 详解

作者头像
Flink实战剖析
发布2022-06-10 17:53:53
7550
发布2022-06-10 17:53:53
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

Flink-Kafka Connector 是连接kafka 的连接器,负责对接kafka 的读写, 本篇主要介绍kafka consumer 的执行流程与核心设计。

逻辑执行流程

  1. 分配当前task消费的partition与起始的offset : 根据从状态中恢复的数据与客户端指定的消费模式, 采取的方式是状态中offset优先, 即从状态中能够找到对应的offset 就使用该offset , 否则就根据客户端指定的方式
  2. 从kafka 中不断拉取数据, 发送到下游,并且保存当前的offset
  3. 为了保证整个任务的全局一致性,需要将offset 提交到状态中
  4. 如果开启了分区发现模式,那么需要将检测到新的分区添加到消费线程中。

两个重要接口

Flink 保证全局数据一致性是通过全局状态快照checkpoint 完成的, 也就是周期性的执行checkpoint 将当前的任务状态保存起来, Flink 在整个checkpoint 的执行过程中提供了两个接口,方便用户去做一些自定义的操作, 例如操作状态、两阶段提交实现等等。

CheckpointedFunction接口

提供了initializeState方法与snapshotState方法,initializeState方法是在任务初始化时候执行,常见的就是获取的checkpoint 中的状态数据;snapshotState方法是在每次checkpoint触发都会执行,常见的就是将数据存放在状态对象中,以便能够被持久化。

CheckpointListener接口

提供了notifyCheckpointComplete方法与notifyCheckpointAborted方法,这两个方法都是在一次checkpoint 完成之后执行,那么有可能是通知成功回调(notifyCheckpointComplete)也有可能失败回调(notifyCheckpointAborted)。

具体实现

对于Flink 来说source端的标准对接接口是SourceFunction ,主要实现其run方法,在run 中执行数据的pull操作;另外为了保证整个状态的一致性,在checkpoint时需要记录checkpoint时的offset, 并且保证其失败重启时也能够从checkpoint 记录的offset开始消费, 因此同时实现了CheckpointedFunction接口与CheckpointListener接口,这两个接口提供了可操作状态的一些方法。

FlinkKafkaConsumerBase 实现SourceFunction、CheckpointedFunction、CheckpointListener接口的抽象类,包含了整个流程的核心方法,如下:

initializeState

从checkpoint 中 恢复最近一次或者是指定批次checkpoint 中offset, 并将其存放在TreeMap<KafkaTopicPartition,Long> 结的 restoredState 对象中

open

主要作用就是分配当前task消费的partitioin 的offset 位置

1. partition 分配策略:姑且认为是当前task的下标与 partition%numTask 相等就分配给当前task

2. offset 分配策略:有状态数据就使用状态数据的offset ; 没有就根据客户端指定的StartupMode作为消费起点

run

开始消费kafka 中数据, 通过 KafkaFetcher 完成 :

1. 启动了一个消费线程 KafkaConsumerThread 从kakfa 中拉取数据,将其存储到 Handover 的next 对象中

2. 循环从Handover 的next 中获取数据

3. 记录下当前的offset, 更新到subscribedPartitionStates 中去

createAndStartDiscoveryLoop

在run 方法中被调用, 开启了异步分区发现的线程discoveryLoopThread,会按照指定的时间间隔检查是否有新的分区(默认情况下不开启), 当发现有新的分区时会将其添加到unassignedPartitionsQueue中, 以便被KafkaConsumerThread 线程检测到

snapshotState

将记录的subscribedPartitionStates 中消费进度数据写入到 unionOffsetStates 状态中与临时对象pendingOffsetsToCommit中

notifyCheckpointComplete

提交offset 至kafka中:将pendingOffsetsToCommit 中记录当前批次checkpoint 的offset 数据提交到kafka 中

核心流程

  1. KakfaConsumerThread 线程不断从Kafka 中消费数据
  2. 消费的数据存储handover 中
  3. kafkaFetch 不断从handover 获取数据进行处理

其他流程

  1. initializeState、snapshotState 这两个方法是实现了CheckpointedFunction接口里面的对应方法,CheckpointedFunction 接口是Flink 提供的两个hook, 任务初始化执行initializeState,用于从状态中恢复数据, 优于open先执行, 用于其恢复offset数据;snapshotState 每次触发checkpoint 时执行,提供用户操作hook, 用于将offset 数据保存在状态中。
  2. notifyCheckpointComplete 是实现了CheckpointListener 接口中的方法, checkpoint 完成之后的回调方法, 提交状态中的offset数据至kafka中。

offset 提交

对于整个offset的提交至kafka中, 类似于两阶段的提交过程:

  • 第一阶段:执行checkpoint 时即调用snapshotState方法, offset 保存到状态中
  • 第二阶段:checkpoint 执行完成时回调notifyCheckpointComplete方法,offset 提交到kafka中

对于第一阶段失败任务直接重启,从最近一次checkpoint记录的位点开始消费,对于第二阶段提交offset至kafka如果失败,并不会导致任务重启,只是做了日志记录,因为提交offset到kafka成功与否并不会影响任务的执行。

启动时offset指定

  • 如果是从checkpoint 恢复,那么就会忽略客户端所指定的startMode , 也就是checkpoint 状态数据优先

总结

本篇主要介绍了FlinkKafkaConsumer的核心设计流程与实现,同时介绍了与checkpoint流程结合完成offset的管理。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-04-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 逻辑执行流程
  • 两个重要接口
  • 具体实现
  • 总结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档