前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KafkaConsumer 组件源码 ConsumerCoordinator

KafkaConsumer 组件源码 ConsumerCoordinator

作者头像
平凡的学生族
发布2020-07-02 17:13:50
6310
发布2020-07-02 17:13:50
举报
文章被收录于专栏:后端技术后端技术后端技术

ConsumerCoordinator继承于AbstractCoordinator,也是其唯一的实现类。AbstractCoordinator定义了有关集群协调的逻辑,定义了消费者与特定的broker(cordinator)交互的逻辑,供消费者加入消费组、探知消费组状态。

/**
 * This class manages the coordination process with the consumer coordinator.
 */
public final class ConsumerCoordinator extends AbstractCoordinator {

/**
 * AbstractCoordinator implements group management for a single group member by interacting with
 * (很长的注释,建议读者阅读一遍,此处省略)
*  ...
 */
public abstract class AbstractCoordinator implements Closeable {

ConsumerCoordinator主要负责与消费者组coordinator间的联系,比如发现coordinator、加入group、还有查询提交的offset、提交offset。

在加入group后,还会启动HeartBeatThread维持与coordinator的心跳,维持成员状态。

主要流程

在KafkaConsumer::pollOnce中,总是会先调用一次ConsumerCoordinator::poll。

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        client.maybeTriggerWakeup();

        long startMs = time.milliseconds();
        coordinator.poll(startMs, timeout);
        ...

在poll调用中除了处理offset commit,还有与coordinator取得联系、完成rebalance。

阅读过Kafka消费者Rebalance机制的话[1]就知道:

  • coordinator就是指__consumers_offsets对应分区的leader,ensureCoordinatorReady会与某broker联系,找到coordinator,并与之建立连接
  • rebalance分为join和sync两个阶段,ensureActiveGroup会检查自己的group状态,与coordinator联系,完成加入group的流程。

ensureCoordinatorReady

/**
* Block until the coordinator for this group is known and is ready to receive requests.
*/
public synchronized void ensureCoordinatorReady() {
  // Using zero as current time since timeout is effectively infinite
  ensureCoordinatorReady(0, Long.MAX_VALUE);
}

然后阅读ensureCoordinatorReady(long startTimeMs, long timeoutMs)即可,大概逻辑就是:

  1. 调用lookupCoordinator向某个broker发送寻找coordinator的请求,等待收到响应,收到响应后,FindCoordinatorResponseHandler进行处理,与coordinator建立连接
  2. 如果报错,就更新Metadata再重试

一些总结

ConsumerCoordinator、AbstractCoordinator有大量的逻辑判断代码,又会经常修改自身状态和SubscriptionState。如果单纯地去记忆发出xx请求,收到响应/报错后做什么,根本无法记下来,而且也无助于理解。个人觉得:

  1. 可以先从poll入手,去阅读主要的流程,再深入看其它细节
  2. 可以假设一切请求/响应都成功时,流程是怎么推进的

ConsumerCoordinator和HeartBeatThread都有以下特点,可能有助于代码的阅读:

  1. 利用ConsumerNetworkClient完成与Kafka节点的通信,发出请求、制定异步响应流程
  2. 请求-响应流程是异步的,因此到处可见用RequestFuture[2]来构建异步流程的操作。
  3. 都会并发修改集群状态,造成race condition,因此调用synchronized (AbstractCoordinator.this)进行同步
  4. 定义了大量xxResponseHandler,制定了收到响应后的行为。比如JoinGroupResponseHandler描述了发出join group请求并收到响应后的行为。

  1. 比如Kafka消费者组
  2. 读者需要理解RequestFuture的用法,否则会对各种addListener、compose、chain调用感到疑惑,而这些都是制定异步流程的方法。可阅读RequestFuture原理
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 主要流程
  • ensureCoordinatorReady
  • 一些总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档