Kafka消费者主流程概况

1、外部调用方通过使用KafkaConsumer#poll()返回所订阅得消息集合。

2、KafkaConsumer#poll()由于是个非线程安全的类,所以需要通过KafkaConsumer#acquireAndEnsureOpen()来确保只有当前线程可以操作,如果检测到多线程并发的操作,则抛出异常。

3、SubscriptionState#hasNoSubscriptionOrUserAssignment()检验如果当前没有任何订阅,则还是抛出异常。

4、进行获取数据核心方法:KafkaConsumer#pollOnce()。

4.1、ConsumerNetworkClient#maybeTriggerWakeup()检测是否在不可中断的方法内。

4.2、ConsumerCoordinator#poll()提交请求offset的回调函数。

4.2.1、SubscriptionState#partitionsAutoAssigned()判断订阅状态是否自动分配分区。如果当前ConsumerCoordinator为空,则循环查找GroupCoordinator节点,否则一直阻塞。然后继续判断是否需要rejoin,如果需要刷新原数据。AbstractCoordinator#ensureActiveGroup()确保GroupCoordinator可用,AbstractCoordinator#startHeartbeatThreadIfNeeded()开启心跳线程。最终在AbstractCoordinator#joinGroupIfNeeded()中初始化JoinGroupRequest,并且发送请求到服务器端。

4.2.2、ConsumerCoordinator#pollHearbeat()检测心跳线程状态。如果heartbeatThread失败,将heartbeatThread设为空,然后抛出异常,下一次调用AbstractCoordinator#ensureActiveGroup()。如果heartbeatThread正常则Heartbeat#poll()更新下一次heartbeat的lastpoll时间戳。

4.2.3、ConsumerCoordinator#maybeAutoCommitOffsetsAsync()异步自动提交offset

4.3、消费者在拉取消息之前,会先判断所有的分区是否都有拉取偏移量。SubscriptionState#hasAllFetchPositions()判断是否所有的分区都存在有效的拉取偏移量。如果没有通过SubscriptionState#missingFetchPositions()找出TopicPartition集合,调用KafkaConsumer#updateFetchPositions()更新分区的信息,包括刷新已经提交的偏移量、更新拉取偏移量。

4.3.1、Fetcher#resetOffsetsIfNeeded()更新分区状态的拉取偏移量。

4.3.2、SubscriptionState#hasAllFetchPositions()校验一次,ConsumerCoordinator#refreshCommittedOffsetsIfNeeded()更新分区状态的提交偏移量。Fetcher#updateFetchPositions()更新分区状态中的拉取偏移量。

4.4、获取数据核心逻辑:Fetcher#fetchedRecords()。

4.4.1、创建一个以TopicPartition为key,ConsumerRecord列表为value的Map来存放所获取的结果集。然后初始一个还可获取最大记录数。

4.4.2、循环从completedFetches队列中消费CompletedFetch数据,之后把出列的CompletedFetch对象通过调用Fetcher#parseComletedFetch()方法校验所属的分区状态、所属的分区消费的offset是否与当前消费的位置匹配,然后才转换成PartitionRecords对象返回并设置为全局变量nextInLineRecords等待下一轮循环的消费处理。在下一个循环中同样会调用Fetcher#fetchRecords(),在该方法中继续检查校验SubscriptionState#isAssigned()是否已经分配的分区、匹配是不是一个可以被获取的数据,即不是被暂停或者没有有效offset的分区,然后从PartitionRecords#fetchRecords()中返回maxRecords的记录集合。最终按照TopicPartition放入Map结果集中。

4.4.3、在达到maxPollRecords数之后返回Map结果集。

4.5、如果结果集不为空,立即返回上一层。否则调用Fetcher#sendFetches()发送拉取请求。

4.5.1、Fetcher#createFetchRequests()获取Kafka集群的元数据。

4.5.2、Fetcher#fetchablePartitions()遍历那些可以获取数据的分区。先SubscriptionState#fetchablePartitions()返回那些可以拉取偏移量的分区。首先根据分区信息找到leader副本,找不到leader副本的更新元数据。然后把发往同一个节点的所有TopicPartition和PartitionData封装成一个FetchRequest对象。这样做的优势是减少客户端与服务端的网络连接。

4.5.3、ConsumerNetworkClient#send()将发往每个节点的FetchRequest都缓存在unset队列上。并添加RequestFutrueListener做处理FetchResponse的监听入口。RequestFutrueListener#onSuccess()方法是当futrue成功触发,然后Fetcher#matchesRequestedPartitions检查FetchRequest和FetchResponse数据key是否匹配。然后封装成CompletedFetch存入CompletedFetches队列中。

4.6、ConsumerNetworkClient#poll()是否阻塞。

4.7、长时间poll()之后,我们应该在获取返回数据之前再次检查这个组是否再次重新平衡动作,确保这个组可以稳定。

4.8、Fetcher#fetchedRecords()获取数据并返回。

5、KafkaConsumer#poll()获取到返回的数据之后,如果数据不为空则在返回最终结果前再次发起一次Fetcher#sendFetches()请求,节省下一次获取数据的请求时间。

6、如果存在ConsumerInterceptors,调用ConsumerInterceptors#onConsume()。

7、如果当此获取数据为空,计算是否当前方法的用时是否有超时,在没有超时的情况下继续循环获取数据。

8、KafkaConsumer#release()释放锁资源。

  • 发表于:
  • 原文链接:http://kuaibao.qq.com/s/20180126G091K200?refer=cp_1026

扫码关注云+社区