前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka的消费者组(下)

kafka的消费者组(下)

作者头像
陈猿解码
发布2023-02-28 15:15:04
7070
发布2023-02-28 15:15:04
举报
文章被收录于专栏:陈猿解码陈猿解码

上一文对消费者组的一些概念,基本原理进行了简单描述,本文继续来聊聊消费者组中另外一个比较重要的内容:偏移量的存储。

【消费者偏移量的提交】

1. 消息消费的整体流程介绍

消费者在成功加入消费者组,并得到分配的分区信息后,对分配的分区依次向服务端发送请求获取上一次提交的偏移信息,并在内存中记录获取到的偏移量信息;

随后向服务端发送fetch(消息)请求,在该请求中就包含了内存中记录的偏移量信息,即从指定位置进行消息的消费。

服务端根据请求信息从log文件中读取文件,并给予响应。

客户端收到消息后,在内存中更新消费的偏移量信息,并由使用者手动或自动向服务端提交消费的偏移量信息。

2. 偏移量的提交流程

消费者的偏移量是由消费者自己来进行提交的,当前提交的方式有两种,自动提交或手动提交。

1)自动提交

当配置项"enable.auto.commit"设置为true后,消费者开启自动提交偏移的模式。自动提交本质上是消费者内部的轮询线程定时、异步对内存中记录的偏移量信息进行提交。

定时的时间间隔是由配置项"auto.commit.interval.ms"的值来决定的。

2)手动提交

当配置项"enable.auto.commit"设置为false后,也就禁用了自动提交偏移量的功能。此时使用者在处理消费的消息的同时,需要调用"commitSync"来手动提交消费偏移量信息。当然,从函数的字面意思也可以看出,手动提交请求动作是同步完成的。

【偏移量在服务端的存储】

kafka服务端对于消费者偏移量提交请求的处理,最终是将其存储在名为"__consumer_offsets"的topic中(其处理流程本质上是复用了向该topic生成一条消息的流程)。该topic默认有50个分区,不同消费者组的偏移量信息存储在不同的分区中,具体按照如下公式计算出存储的分区号:

代码语言:javascript
复制
Math.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
// groupId.hashCode 为消费者组名称的哈希值
// groupMetadataTopicPartitionCount 为__consumer_offsets的分区数

也就是说,一条偏移量提交的请求,以一个消息记录的形式在topic中存储。该消息记录分为key,value两部分,在key中记录了偏移量对应的消费者组名称、消费的topic名称以及分区编号;而在value中则记录了具体的偏移位置,元数据,以及提交时间戳和过期时间戳。

来看个实际实例:在名为hncscwc的topic的第3个分区上,有2条消息,名为spurs的消费者组在该topic上完成消费并提交了偏移量后的情况:

首先,可以通过命令,查看该消费者的偏移量情况:

代码语言:javascript
复制
sh kafka-consumer-groups.sh --bootstrap-server 192.168.42.198:9092 --describe --group spurs
Consumer group 'spurs' has no active members.

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET  LAG CONSUMER-ID     HOST CLIENT-ID
hncscwc 3          2               2               0               - - -

其次,根据上面的计算公式,可以知道消费者组的偏移量存储分区为43。

代码语言:javascript
复制
scala> Math.abs("spurs".hashCode) % 50
res1: Int = 43

最后,直接查看"__consumer-offsets-43"下的log文件的内容

需要注意的是:kafka在运行过程中仅在内存中记录了消费者组的相关信息(包括当前成员信息、偏移量信息等)。那么当删除了__consumer_offset对应的消息记录或者消息超过存储的有效期被自动删除后,对应的消费者组信息也随之消失了。

【偏移量失效的处理策略】

1. 消费者偏移量 out of range的场景

根据前面的介绍可以知道,生产消费的消息与消费者偏移量是分别存储在两个topic中的,通常来说,消费者在加入消费者组后,会从服务端获取对应分区的消费偏移量,这个偏移量一定是在正常生产消息的偏移量范围之内的。然而,在一些特定场景下,也会出现消费者偏移量不在生产的消息的偏移量范围之内的情况。下面就分别举例说明下:

1)消费的偏移量小于实际消息的偏移量

当使用者对topic配置了消息预留期限,或者称之为生命周期(retention),随着时间的推移,消息被删除(也可能是手动删除了老的消息),就可能出现实际消费的偏移量,小于已存储最小消息的偏移量的情况。

2)消费的偏移量大于实际消息的偏移量

一种可能出现该情况的场景是:生产者往topic发送消息的同时,消费者也在进行消费,并且最新消息均消费后进行了offset的提交,服务端在对消费者偏移量的记录完成刷盘动作后,生产消费的topic分区leader节点出现掉电异常,导致实际消息并未写入磁盘,从而出现这种情况。该场景比较难出现,但我们在实际环境中确实遇到过。

2. 消费者的处理策略

不管是上面那种情况,消费者在消费过程中,都会出现"out of range"的异常。在出现该异常后,由配置项"AUTO_OFFSET_RESET_CONFIG"来决定处理策略。该配置项可选的值包括:

  • none

即不做任何处理,kafka客户端直接将异常抛出,调用者可以捕获该异常来决定后续处理策略。

  • earliest

将消费者的偏移量重置为最早(有效)的消息的偏移位置,从头开始消费。这可能会引起消息的重复消费。

  • latest

将消费者的偏移量重置为最新的消息的偏移位置,从最新的位置开始消费。这可能会引起部分消息未进行消费出现消息"丢失"的情况。

关键的代码逻辑如下所示:

另外,在flink的kafka-connector和spark streaming中,该配置项的默认值不同,使用时需要注意。

【小结】

本文主要介绍了kafka消费者组中消费者偏移量的相关内容,并通过一些实际例子对原理分析进行论证,感兴趣的小伙伴们也可以对其中的内容自行测试分析。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-02-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 陈猿解码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档