消息去重

最近更新时间:2019-09-11 17:16:59

对于重复消息,最好的方法是消息可重入(消息重复消费对业务无影响)。做不到可重入时,需要在消费端去重。

一、重复消息出现的原因

网络异常、服务器宕机等原因都有可能导致消息丢失。CMQ 为了做到不丢消息、可靠交付,采用了消息生产、消费确认机制。

生产消息确认:生产者向 CMQ 发送消息后,等待 CMQ 回复确认;CMQ 将消息持久化到磁盘后,向生产者返回确认成功。否则在生产者请求超时、CMQ 返回失败等情况下,生产者需要向 CMQ 重发消息。

消费者确认:CMQ 向消费者交付消息后,将消息置为不可见;在消息不可见时间内,消费者使用句柄删除消息。如果消息未被删除,且不可见时间超时,消息将重新可见。

由于消息确认机制是“至少一次交付(at least once)”,在网络抖动、生产者/消费者异常等情况下,就会出现生产者重复生产、消费者重复消费的情况。

二、去重方案

要去重,先要识别重复消息。通常的做法是在生产消息时,业务方在消息体中插入去重key,消费时通过该 去重key 来识别重复消息。去重key 可以是由 <生产者 IP + 线程 ID + 时间戳 + 时间内递增值> 组成的唯一值。

只有一个消费者时,您可以将消费过的去重 key 缓存(如 KV 等),然后每次消费时检查 去重key 是否已消费过。去重key 缓存可以根据消息最大有效时间来淘汰。CMQ 提供了队列当前最小未消费消息的时间(min_msg_time),您可以使用该时间和业务生产消息最大重试时间来确定缓存淘汰时间。
存在多个消费者时,去重key 缓存就需要是分布式的。

• 根据消息最大有效时间,计算 key 过期时间:
current_time - max_retention_time - max_retry_time - max_network_time

• 根据 CMQ 最小未消费时间,计算 key 过期时间:
min_msg_time - max_retry_time - max_network_time

说明:

CMQ 可配置消息最大有效时间为15天,业务可根据实际情况调整。
CMQ队列当前最小未消费消息时间,即上图中最远时间点。该时间之前的消息都已经被删除,之后的消息可能未被删除。

三、举例说明

避免重复提交:

场景:A 为生产者,B 为消费者,中间是 CMQ。A 已完成10元转账操作,且将消息发送给 CMQ,CMQ 也已成功收到。此时网络闪断或者客户端 A 宕机导致服务端应答给客户端 A 失败。A 会认为发送失败,从而再次生产消息。这会造成重复提交。

解决方法:A 在生产消息时,加入 time 时间戳等信息,生成唯一的 去重key。若生产者 A 由于网络问题判断当前发送失败,重试时,去重key 沿用第一次发送的 去重key。此时消费者 B可通过 去重key 判断并做去重。
(该案例也说明了不能使用 CMQ 的 Message ID 进行去重,因为这两条消息有不同的 ID,但却有相同的 body。)

这里要注意的是,生产者A,在发送消息之前,要将去重key做持久化(写磁盘等,避免掉电后丢失)

避免多条相同 body 的消息被过滤:

场景:A 给 B 转账10元,一共发起5次,每一次提交的 body 内容是一样的。如果消费者粗暴用 body 做去重判断,就会把5次请求,当做1次请求来处理。

解决方法:A 在生产消息时,加入 time 时间戳等信息。此时哪怕消息 body 一样,生成的 去重key 都是不同的,这样就满足了多次发送同样内容的需求。