剩余的未消费消息的条数是如何计算的?
计算方式为:未消费消息数量 = 最大的 offset - 提交的 offset。如下图:

是否支持自动调整消息保留时间?
CKafka 支持添加动态消息保留策略功能,设置数据动态保留策略后,当磁盘空间使用率到达一定的比例后,会自动向前过期一定比例的数据,避免遇到用户消息猛增的情况,磁盘空间满了之后,则无法正常生产和消费。具体操作方式参见 配置磁盘水位处理策略。
消费进度未及时同步到服务端是什么原因?
问题现象
CKafka 消费进度未及时更新服务端,无法通过控制台看到消费速度。
前置储备
CKafka 集群展示消费进度,是以客户端消费位移 offset 的提交频率来评估消费速度和消费进度,故遇到此类问题优先排查客户端消费提交 offset 相关情况。

排查指引
步骤1:检查是否有生产数据写入
如果发现消费没有出现堆积和消费速度,可检查 Topic 以及生产监控指标,查看是否有数据写入。
1. 进入 控制台,单击实例列表 > ID/名称 > Topic 列表,查看对应 Topic 相关分区末端 offset 是否会更新。

2. 进入 控制台,单击实例列表 > ID/名称 > 监控 > Topic,查看对应 Topic 相应生产流量、Topic 占用磁盘的消息总量等指标是否大于 0。

如果 Topic 无实时数据写入,有下述两种情况:
新建消费组,配置
auto.offset.reset=latest
行为,则会从最新开始消费,由于没有实时数据写入,不会产生 commit offset
行为;Topic 数据过了保留时间,配置
auto.offset.reset=earliest
行为,则会从最头开始消费,由于数据已经 retention
,消费不到数据,也不会产生 commit offset
行为。上述情况不产生
commit offset
行为,故而,服务端不会更新消费进度。步骤2:检查客户端配置
对于 Kafka 原生 Client,需要检查 Consumer 配置,是否开启自动提交
offset:enable.auto.commit
。原生 Kafka Java Client 举例
一种直接检查客户端配置,还有一种方式可以从程序的日志里面搜索 ConsumerConfig,如下图所示。该日志只在 KafkaConsumer 初始化时打印,如果日志保留时间较短可能搜索不到。

依据客户端消费特点,如果可以开启自动提交 offset,则可以打开该参数,客户端消费进度将更新到服务端;
如果需要手动管理 offset 的提交,例如对重复消费或丢数有更严格要求,
enable.auto.commit
被关闭的情况下,需要检查客户端程序,是否在消费过程中,同步或者异步执行 offset 的提交动作,例如 consumer.commitSync()while (true) {ConsumerRecords<String, String> consumerRecords = consumer.poll(500);for (ConsumerRecord record : consumerRecords) {System.out.printf("收到消息: partition:%d, offset:%d, time:%d, key:%s, value:%s\\n", record.partition(), record.offset(), record.timestamp(), record.key(), record.value());}consumer.commitSync();}
原生 Kafka Client 经过上述排查调整后,再次观察客户端消费进度是否更新到服务端。
针对 Kafka 流式场景,Flink 出现频率相对较多
对于 Flink 消费客户端,其底层与 Kafka 的访问集成至 Flink的Kafka Source 接口,并与 Flink 框架容错机制相结合,例如 checkpoint 和 savepoint 机制。在这里就会面对两种情况:Kafka Client 底层 enable.auto.commit 处理和 Flink checkpoint 对 offset 的处理。
Kafka Source 在 checkpoint 完成时提交当前的消费offset ,以保证 Flink 的 checkpoint 状态和 Kafka broker 上的提交位点一致。
如果未开启 checkpoint,Kafka Source 依赖于 Kafka consumer 内部的位点定时自动提交逻辑,自动提交功能由 enable.auto.commit 和 auto.commit.interval.ms 两个 Kafka consumer 配置项进行配置。需要注意的是,如果用户在使用 Flink Kafka Source 时没有主动在配置中指定enable.auto.commit,Flink框架将会覆盖 enable.auto.commit=false,即关闭自动提交 offset,如果 Flink 未开启 checkpoint,则需要用户手动开启该参数。
过期消息没有被及时删除是什么原因?
原因分析
Kafka 的消息删除机制会导致某些业务场景出现过期消息没有及时删除的情况,如果对机制不了解容易产生疑惑,例如:分区0和分区7的消息时间戳存在明显差距,分区0的过期消息没有被及时删除。
Kafka 消息删除机制
Kafka 数据存储是以 Topic、分区、数据段三个维度实际落盘存储的,消息数据删除的条件如下:
消息数据根据保留时间进行删除,删除是以数据段为单位的。
每个数据段当前是设置为1GB大小,达到1GB后滚动生成新的数据段。
数据段内的所有消息都过期才会删除该数据段。
如果数据段内有一行消息在保留时间内,即例如段文件的最后一行是在保留时间内,这个段文件就不会被删除。
由于某些原因导致消息写入有倾斜,数据写入集中在某些分区,例如分区7,某些分区数据很少,例如分区0。此时分区0的数据段大小未达到1GB,没发生滚动,但整个段内有数据在保留时间内,所以分区0中的消息就不会被删除。