专栏首页后端技术KafkaConsumer 组件源码 ConsumerCoordinator

KafkaConsumer 组件源码 ConsumerCoordinator

ConsumerCoordinator继承于AbstractCoordinator,也是其唯一的实现类。AbstractCoordinator定义了有关集群协调的逻辑,定义了消费者与特定的broker(cordinator)交互的逻辑,供消费者加入消费组、探知消费组状态。

/**
 * This class manages the coordination process with the consumer coordinator.
 */
public final class ConsumerCoordinator extends AbstractCoordinator {

/**
 * AbstractCoordinator implements group management for a single group member by interacting with
 * (很长的注释,建议读者阅读一遍,此处省略)
*  ...
 */
public abstract class AbstractCoordinator implements Closeable {

ConsumerCoordinator主要负责与消费者组coordinator间的联系,比如发现coordinator、加入group、还有查询提交的offset、提交offset。 在加入group后,还会启动HeartBeatThread维持与coordinator的心跳,维持成员状态。

主要流程

在KafkaConsumer::pollOnce中,总是会先调用一次ConsumerCoordinator::poll。

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        client.maybeTriggerWakeup();

        long startMs = time.milliseconds();
        coordinator.poll(startMs, timeout);
        ...

在poll调用中除了处理offset commit,还有与coordinator取得联系、完成rebalance。 阅读过Kafka消费者Rebalance机制的话[1]就知道:

  • coordinator就是指__consumers_offsets对应分区的leader,ensureCoordinatorReady会与某broker联系,找到coordinator,并与之建立连接
  • rebalance分为join和sync两个阶段,ensureActiveGroup会检查自己的group状态,与coordinator联系,完成加入group的流程。

ensureCoordinatorReady

/**
* Block until the coordinator for this group is known and is ready to receive requests.
*/
public synchronized void ensureCoordinatorReady() {
  // Using zero as current time since timeout is effectively infinite
  ensureCoordinatorReady(0, Long.MAX_VALUE);
}

然后阅读ensureCoordinatorReady(long startTimeMs, long timeoutMs)即可,大概逻辑就是:

  1. 调用lookupCoordinator向某个broker发送寻找coordinator的请求,等待收到响应,收到响应后,FindCoordinatorResponseHandler进行处理,与coordinator建立连接
  2. 如果报错,就更新Metadata再重试

一些总结

ConsumerCoordinator、AbstractCoordinator有大量的逻辑判断代码,又会经常修改自身状态和SubscriptionState。如果单纯地去记忆发出xx请求,收到响应/报错后做什么,根本无法记下来,而且也无助于理解。个人觉得:

  1. 可以先从poll入手,去阅读主要的流程,再深入看其它细节
  2. 可以假设一切请求/响应都成功时,流程是怎么推进的

ConsumerCoordinator和HeartBeatThread都有以下特点,可能有助于代码的阅读:

  1. 利用ConsumerNetworkClient完成与Kafka节点的通信,发出请求、制定异步响应流程
  2. 请求-响应流程是异步的,因此到处可见用RequestFuture[2]来构建异步流程的操作。
  3. 都会并发修改集群状态,造成race condition,因此调用synchronized (AbstractCoordinator.this)进行同步
  4. 定义了大量xxResponseHandler,制定了收到响应后的行为。比如JoinGroupResponseHandler描述了发出join group请求并收到响应后的行为。

  1. 比如Kafka消费者组
  2. 读者需要理解RequestFuture的用法,否则会对各种addListener、compose、chain调用感到疑惑,而这些都是制定异步流程的方法。可阅读RequestFuture原理

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Ex1:图像读取和显示以及像素操作

    dev下搭建环境,详情见https://www.jianshu.com/p/d5e18b9b0333

    平凡的学生族
  • kafka 内存管理 BufferPool

    结合代码可知,BufferPool负责ByteBuffer的申请和释放。 BufferPool会维持一组大小为poolableSize的ByteBuffer,...

    平凡的学生族
  • 原子操作 CAS CompareAndSwap

    使用AtomicInteger、AtomicBoolean等原子操作类可以完成原子操作。它的各种操作都是基于Unsafe类的,你可以看到函数的画风都是下面这样:

    平凡的学生族
  • HTML5-FileReader

    链接:https://developer.mozilla.org/zh-CN/docs/Web/API/FileReader/onload

    eadela
  • DDoS防护——中国互联网企业的“出海之盾”

    2020年开年,一系列“活久见“级别的黑天鹅事件给世界造成了巨大的冲击,其中一个正在加速的变化就是,现实世界的娱乐、社交等生活边界被虚拟网络蚕食鲸吞。以移动互联...

    腾讯安全
  • [语音识别] 单音素、三音素、决策树

    以前的音标现在也可以叫音素,而且现在正广泛的把音标叫音素。 每一种语言中的音素都是不一样的,即使是同种语言中,方言的音素也是不一样的。音素应该与人体的发音严格的...

    MachineLP
  • HTML5 FileReader 读取txt文件

    版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/...

    j_bleach
  • 音素 – phone | phonetics

    在语音合成中,有一种关键技术是将文字拆解成音素,再去语音库里匹配相同音素的语音片段,来实现文字转换语音,那么音素到底是什么?

    easyAI
  • 美国邮政署开始测试自驾卡车运输服务

    美国政府打算将信件及包里透过无人驾驶车运送,委托中国自驾卡车业者TuSimple执行短期自驾车送件任务

    阿泽
  • 需求收集

    首先,需求人员在与用户确认需求的过程中,一定不要放过任何一个细节,仔细体会用户的每一个要求。对于用户的要求,需求人员需要对其加以梳理: 哪些是合理的需求,哪些是...

    公众号php_pachong

扫码关注云+社区

领取腾讯云代金券