异常消费者隔离

最近更新时间:2025-05-27 09:53:02

我的收藏
在某些使用场景下,会出现消费者无法正常处理消息,但因网络连接无异常,服务端未感知到消费者异常,此时服务端会持续推送消息给消费者。为了缓解此类场景带来的异常消息堆积问题,Pulsar 提供了异常消费者隔离机制,本文将详细描述适用场景及实现方案。

场景和实践教程

适用场景

在使用 shared 订阅模式的时候,有些情况下,消费者可能由于机器或系统层面(例如文件系统、硬盘阻塞等)的原因导致消费逻辑阻塞。此时,虽然消费逻辑被阻塞,但客户端和服务端网络连接正常,这种情况下,服务端不会认为消费者已经离线,还会继续推送消息给消费者。但是由于消费者的消费逻辑被阻塞,推送给消费者的消息都无法消费,会产生 unack 消息,出现消息堆积的情况,且无法恢复。
当出现上述情况时,通常需要客户端主动重启或者关闭有问题消费者才能恢复。
为了解决上述场景问题,可以通过此功能从服务端侧主动发现并隔离异常消费者。

不适用的场景

1. 由于消息本身的问题造成的消息阻塞。例如某个消息的格式问题,导致消费的时候解析消息失败,失败之后导致消费逻辑阻塞。
2. 在此情况下,当开启这个功能后,服务端会将阻塞的消费者隔离,并且将异常的消息重新投递,可能会造成有问题的消息被反复投递到其他的消费者,可能造成更多的消费者出现消费阻塞的情况。
3. 无法确认消息的消费耗时。此时无法准确的设置 ACK 超时时间,可能导致消费者被错误的隔离。
4. 消费侧存在消费完后没有 ACK 的情况。这个问题要客户端侧自行保证,不能依赖此功能兜底。

实践教程

1. 异常消费者隔离功能只在 shared 订阅模式下生效。
2. ACK 超时时间要根据业务的消费耗时来确定,通常要远大于业务的消费常规消费逻辑的耗时,这样能避免错误的将消费者隔离,建议至少要 5min 以上(当然,如果业务的常规消费耗时就会超过 5min,那么这个值需要更大)
3. 针对上面不适用场景中描述的情况,要做好消费逻辑的异常处理,避免由于消息本身的解析等问题,造成消费阻塞的情况。可以考虑使用重试和私信机制,详情可参见 重试和私信机制
4. 服务端对消费者 ACK 超时的检查是通过定时任务实现的,定时任务的时间周期是 30s,如果设置的 ACK 超时时间是 5min,那么最多需要 5min 30s 把消费和设置为隔离状态。
5. 此功能的核心是兜底消费者消费完全阻塞的情况,并非是为了实现服务端的主动重新投递 unack 消息的功能而设计的,使用上要避免将该功能作为服务端主动重新投递的功能来使用(目前 Pulsar 中没有服务端超时重新投递消息的功能)
6. 客户端要防止出现漏 ACK 的情况(消费完没有 ACK 消息)。如果出现消费者漏 ACK 消息的情况,可能会导致服务端将对应的消费者识别为异常消费者,将消费者错误的加到黑名单中,同时,还会造成将已经消费过的消息被重新推送给其他消费,产生重复消费的问题。
7. 此功能在 key_shared 订阅模式下不生效。key_shared 订阅模式对业务的使用场景要求更加严苛,如果不能保证按照 key_shared 的适用场景来使用,则建议尽量避免生产环境中使用 key_shared 订阅模式,详情可参见 订阅模式

实现方案



方案描述

1. 当服务端有推送消息到消费者,但是超过一定时间没有接受到 ACK 请求,会将对应的消费者添加到黑名单,并将该消费者 hold 的所有 unack 消息重新推送到其他在线消费者实例。
2. 消费者加入到黑名单后,服务端不会继续推送消息到对应的消费者上。
3. 当黑名单中的消费者接收到 ACK 或者下线,将对应的消费者从黑名单中移除。
4. 至多允许不超过 60% 的消费者加入到黑名单中(防止异常情况下出现雪崩的情况,导致所有的消费者都无法正常消费)。

注意事项

服务端会维护每个消费者的最后 ACK 时间,但是服务端不会记录每一个消息的 ACK 时间,服务端每次收到消费者的 ACK(或 unack )请求,都会更新服务端维护的消费者最后 ACK 时间。用户在控制台上设置的 ACK 超时时间是为了和服务端维护的消费者最后 ACK 时间进行比较的,超过这个时间之后服务端会将消费者设置为隔离状态。

相关概念介绍

什么是 unack 消息?
消费者未确认的消息。是 shared 和 key_shared 订阅模式下的概念,表示服务端推送消息到消费者,但是服务端还没有接收到消费者的 ACK 请求的消息。
为什么要限制订阅的 unack 消息的数量?
unack 消息的消息 ID 会存储在服务端的内存中,过多的 unack 会占用更多的内存资源,通常会造成内存使用上升,gc 压力增加,极端情况下会甚至会造成内存溢出,导致服务不可用,给 Pulsar 实例带来稳定性风险。