消息空洞

最近更新时间:2025-09-17 20:39:01

我的收藏
IndividualDeletedMessages 是 Pulsar 中 shared 和 key_shared 订阅模式中订阅进度的一部分,一般也称为消息空洞。
我们都知道在 Pulsar 中,消费进度由两部分组成:
MarkDeletePosition:消费进度初始位置,此位置以及之前的消息,被认为已经全部消费过。等同于 Kafka 中的 offset。
IndividualDeletedMessages:消息确认集合,也可以称为消息空洞。在 MarkDeletePosition 之后,确认过的消息位置的集合。

订阅进度概念

消息消费进度图
消息消费进度图

消息消费进度图为例:
MarkDeletePosition = 1:2
IndividualDeletedMessages = [ (1:4 - 1:6], (1:7 - 1:8] ]
这里出现消息空洞,即确认消息不连续的情况有很多,常见的如下:
1. shared 或者 key_shared 消费模式下,某些消息消费的快,某些消息消费慢,很容易出现空洞的情况。
2. 主题中存在延迟消息的情况,上面 1:3、1:4、1:7 等位置的消息可能还没有到达延迟时间,但是 1:5、1:6、1:8 等消息已经达到延迟时间,这个时候也会出现空洞的情况。

消息空洞和 unack 的区别

unack 表示服务端已经推送消息到消费者的内存队列中,但是消费者并没有返回服务端消息确认(即没有调用 consumer.acknowledge 相关接口)。(常见的情况包括:消费慢、消费逻辑阻塞导致的 unack 等。)
例如消息消费进度图中:消息 1:4 和 1:10,是服务端推送给消费者,但是消费没有 ACK 的消息,这些消息会被计入到 unack 消息中。
说明:
综上所述,消息空洞和 unack 是两个不同的概念,两者没有直接关系。不过,unack 某些情况下会产生消息空洞(例如消息消费进度图的场景中)。

数量限制背景

订阅进度中的 IndividualDeletedMessages 部分需要定期持久化,持久化的过程是将 IndividualDeletedMessages 对象序列化成字符串,作为一条内部消息,存储到 Bookie 集群中,以便在主题重新加载后可以获取到完整的订阅进度信息。
在 Pulsar 中,存储的 IndividualDeletedMessages 的数量存在限制,是通过集群维度的配置项 managedLedgerMaxUnackedRangesToPersist 指定的,默认10000。当 IndividualDeletedMessages 的数量超过 1 万,只会持久化 IndividualDeletedMessages 中最前面的 1 万个,IndividualDeletedMessages 超过 1 万的部分会丢失。
注意:
IndividualDeletedMessages 未完全持久化的影响:消费进度不完整,主题因非预期重新加载的情况下(unload topic 或者 broker 重启)会造成部分消息被重复消费。
建议您为实例 Topic 添加空洞消息告警,相关阈值可以按照实际情况,推荐至少设置在 8000~10000 之间,以确保消息空洞数量不超过上限。相关配置可参考 配置告警
限制的原因主要归结于以下两个方面:
1. 如果不限制,当 IndividualDeletedMessages 过大的时候,内部消息长度会变大,消息队列并不适合处理过大的消息。这样,内部订阅进度产生的消息会给集群带来过大的压力,甚至影响整个集群的稳定性。
2. IndividualDeletedMessages 也是作为消息存储到 bookie 中,也需要遵循消息最大长度限制(5M)。根据之前测试经验,当 IndividualDeletedMessages 的数量超过20w,生成的消息长度就会达到5M。

消息空洞的产生和实践教程

IndividualDeletedMessages 的产生有以下4种场景。其中延迟消息和 key_shared 订阅可能会产生大量的 IndividualDeletedMessages,这两个场景下,更容易出现进度丢失的情况。

场景1:未确认消息

订阅模式:shared、key_shared
订阅进度:
MarkDeletePosition = 1:2
IndividualDeletedMessages = [ (1:4 - 1:6], (1:7 - 1:8] ]

IndividualDeletedMessages 的产生和消费行为相关,不同消息消费耗时不同。
服务端推送消息给客户端,每个消息的消费耗时不一致,可能出现较晚生产的消息先消费完成的情况。这个时候,会导致 IndividualDeletedMessages 的数量的增加。
IndividualDeletedMessages 最多不会超过最大未确认消息数量(默认5000)。
结论:IndividualDeletedMessages 不能超过最大未确认消息数量(默认5000),通常无 IndividualDeletedMessages 超过managedLedgerMaxUnackedRangesToPersist 的风险。

场景2:延迟消息

订阅模式:shared、key_shared
订阅进度:
MarkDeletePosition = 1:2
IndividualDeletedMessages = [ (1:4 - 1:6], (1:7 - 1:8] ]

IndividualDeletedMessages 数量和延迟消息在队列中的分布情况有关系。
当读取消息的时候,发现消息是延迟消息,并且没有到达延迟时间,对应位置的消息无法推送到消费者。当后面的消息延迟时间先于前面发送的消息延迟时间达到,后面生产的消息会先消费完成。这个时候,会导致 IndividualDeletedMessages 的数量的增加。
如果产生的空洞超过 1万个,当主题重新加载的时候,就会存在订阅进度部分丢失的情况。
结论:IndividualDeletedMessages 可能超过 managedLedgerMaxUnackedRangesToPersist。

实践教程

如何降低(控制)消息空洞的数量,(根据上面的介绍)在了解空洞(IndividualDeletedMessages)产生的原理后,下面的建议通常可以减少空洞数量,或者避免消息空洞引起的重复消息:
1. 消费端做好幂等,以此作为兜底方案。
2. 将「延迟消息」和「非延迟消息」分别放到不同的主题。
3. 尽量让延迟消息的延迟时间保持一定的顺序性(最好是递增)【需要结合实际的业务场景】。
4. 尽量让相同延迟时间的消息放到一起【需要结合实际业务场景】。
5. 扩容分区,同时做好分区间数据均衡。

优化思路

考虑到结合实际业务场景可能存在理解偏差,我们在这里提供两套思路,特别针对延迟消息保持顺序性以及相同的延迟时间的消息整合发送,以下优化思路存在业务系统改造
优化方案1-尽量让延迟消息的延迟时间保持一定的顺序性(类时间轮的机制)
将延迟时间为 1 天内的消息按照真实的延迟时间投递到随机延迟时间主题;将 1 天以上的延迟消息,按照固定的延迟时间投递到固定延迟时间主题;
业务同时订阅 2 个主题,当消费到消息发现延迟时间未到达用户实际的延迟时间,按照上面的逻辑(1 天内投递到随机延迟时间主题,超过 1 天投递到固定延迟时间主题)重新投递消息。
发送及接收方根据延时时间判断生产及消费所使用的主题
发送及接收方根据延时时间判断生产及消费所使用的主题

发送消息阶段
通过消息的到期时间计算所要发送的目标 topic。
将延迟时间 1 天内的延迟消息,按照实际延迟时间投递到主题-1(随机延迟时间主题,里面存放的消息按照消息真实的延迟时间设置延迟时间);
将延迟时间 1 天以上的延迟消息,按照固定1天的延迟时间投递到主题-2(固定延迟时间主题,里面存放的消息按照固定1天的延迟时间设置延迟时间),在消息的 Properties 中存放消息的真实延迟时间。
消费阶段
主题-1 和主题-2 都有消费者。
主题-1 中的消费者在延迟消息到期后直接消费;
主题-2 中的消费者在延迟消息到期后消费消息,根据消息的 Properties 获取消息的真实延迟时间,如果真实的延迟时间小于 1 天,按照真实的到期时间,投递到主题-1;如果真实的到期时间超过 1 天,继续按照固定 1 天的延迟时间,投递到主题-2 中。
方案的收益
主题-1 中的延迟消息控制在 1 天的时间范围内,空洞数量就得到了控制;
主题-2 中的延迟消息按照固定延迟设置,消息的到期时间保持有序,不存在随机延迟时间的场景下带来的空洞问题;
缺点
1. 真实延迟时间超过 1 天的消息,需要多次消费重投(消费重发),理论上最大需要重新投递次数=真实的延迟天数;
如何降低上面方案带来的延迟消息重复投递次数?
可以通过增加更高维度的时间轮方式进行优化。例如下图,增加5天的固定延迟时间主题,这样可以降低真实延迟时间是5天以上的消息重投的次数
增加更高维度的延迟时间主题
增加更高维度的延迟时间主题

2. 如果 1 天内的延迟消息带来的空洞规模依然很大,需要进一步缩短随机延迟时间主题的时间范围(例如缩短到 12h),这样会继续放大消息重投的次数。
优化方案2-尽量让相同延迟时间的消息放到一起(拆分多个主题)
拆分多个主题,每个主题存放延迟时间在指定某 1 天内到期的消息
拆分主题,分别放置延迟时间在同一天内的消息
拆分主题,分别放置延迟时间在同一天内的消息

发送消息阶段
通过消息的到期时间计算所要发送的目标 topic,如:
1 月 1 日到期的消息就发送到主题1;
1 月 2 日到期的消息就发送到主题2;
1 月 3 日到期的消息就发送到主题3。
消费阶段
每个主题当然都还是有消费者,但是每个消费者就只会消费到其对应负责的那个到期日期的消息:
1 月 1 日的消息最终会在 1 月 1 日到期时,在主题1 下的消费者消费到;
1 月 2 日的消息最终会在 1 月 2 日到期时,在主题2 下的消费者消费到;
1 月 3 日的消息最终会在 1 月 3 日到期时,在主题3 下的消费者消费到。
方案的收益
控制单个主题内延迟消息的延迟时间的范围分布,每个主题的延迟消息都是同一天内的,那么其空洞都只会集中在一天的范围内,空洞数量就得到了控制。
缺点
1. 需要管理更多的主题,并维护流转关系;
2. 如果 1 天内的延迟消息带来的空洞规模依然很大,需要进一步拆分,可能需要引入管理更多的主题。

场景3:key_shared 订阅

订阅模式:key_shared
订阅进度:
MarkDeletePosition = 1:2
IndividualDeletedMessages = [ (1:4 - 1:6], (1:7 - 1:8] ]

IndividualDeletedMessages 数量和消息的推送情况相关。
当某些 key 的数量过多或者消费过慢,consumer 无法接收更多对应 key 的消息时,服务端读取到对应 key 的消息后,无法推送给客户端。如果后面发送的消息可以投递给消费者,并且先于前面的消息消费完成。这个时候,会导致 IndividualDeletedMessages 的数量的增加。
如果产生的空洞超过 1w 个,当主题重新加载的时候,就会存在订阅进度部分丢失的情况。
结论:IndividualDeletedMessages 可能超过 managedLedgerMaxUnackedRangesToPersist。

实践教程

1. 消费端做好幂等。
2. 做好消费能力的快速提升(横向扩容能力),尽量避免出现消息堆积的情况【这一点和官网对于 key_shared 的订阅模式的实践教程一致】。
3. Key 数量多且每个 Key 的消息分布相对均匀,避免由于部分 key 数据倾斜或者处理慢造成堆积。

场景4:Exclusive/Failover 订阅

订阅模式:Exclusive、Failover
订阅进度:
MarkDeletePosition = 1:2
IndividualDeletedMessages = [ (1:4 - 1:6], (1:7 - 1:8] ]

和场景1【未确认消息】的区别是,Exclusive 和 Failover 订阅模式下,服务端不统计 unack 消息,consumer 只要 receive 消息,服务端就可以继续推送后续的消息。所以,如果有消息没有确认,会导致 IndividualDeletedMessages 增加。
结论:IndividualDeletedMessages 可能超过 managedLedgerMaxUnackedRangesToPersist。

实践教程

1. 推荐尽量使用累积确认(acknowledgeCumulative)的方式,可以避免产生空洞(IndividualDeletedMessages)。
2. 如果使用的是单条确认(acknowledge)的方式。
消费端做好幂等。
做好分区间消息数量的均衡。
做好消费能力的快速提升(横向扩容能力),尽量避免出现消息堆积的情况。
3. 如果是 Failover 订阅模式,通过扩容分区,可能缓解。