本文主要介绍消息队列 TDMQ RocketMQ 版中定时与延迟消息的概念和使用方式。
相关概念
定时消息:消息在发送至服务端后,实际业务并不希望消费端马上收到这条消息,而是推迟到某个时间点被消费,这类消息统称为定时消息。
延时消息:消息在发送至服务端后,实际业务并不希望消费端马上收到这条消息,而是推迟一段时间后再被消费,这类消息统称为延时消息。
实际上,延时消息可以看成是定时消息的一种特殊用法,其实现的最终效果和定时消息是一致的。
使用方式
开源 Apache RocketMQ 没有提供让用户自由设定延时时间的 API 的,TDMQ RocketMQ 版为了保证向开源 RocketMQ Client 的兼容,设计了一种通过添加 message 的 property 键值对来指定消息发送时间的方法。该方法只要在需要定时发送消息的
property 属性中增加 __STARTDELIVERTIME 属性值,就能在一定范围内(40天)实现该消息在任意时间的定时发送。延时消息则可以先通过计算得到定时发送的时间点,再以定时消息的形式发送。延时消息先通过
System.currentTimeMillis() + delayTime 计算得到定时发送的时间点,再以定时消息的方式发送。Message msg = new Message("test-topic", ("message content").getBytes(StandardCharsets.UTF_8));// 设定消息在10秒之后被发送long delayTime = System.currentTimeMillis() + 10000;// 将 __STARTDELIVERTIME 设定到 msg 的属性中msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));SendResult result = producer.send(msg);System.out.println("Send delay message: " + result);
Duration messageDelayTime = Duration.ofSeconds(10); final Message message = provider.newMessageBuilder() // Set topic for the current message. .setTopic(topic) // Message secondary classifier of message besides topic. .setTag(tag) // Key(s) of the message, another way to mark message besides message id. .setKeys("yourMessageKey-3ee439f945d7") // Set expected delivery timestamp of message. 设置延迟消息的时间 .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis()) .setBody(body) .build();
实现原理
开源社区实现方案
最初,社区通过复用 RetryTopic 的多级重试投递逻辑,采用延时队列来实现延时消息。但是由于每一条队列对应一个延迟时间,导致队列数量成为瓶颈,因此最终支持多级别延时消息。以下是目前所支持的18个延时级别分别对应的延时时间。
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
多级别延时消息底层基于一个延时 Topic,多条 Queue,每一条 Queue 对应一个延时级别,如下图所示。

当延时消息进入时会判断延时等级属性,放进相应的队列尾部。通过线程池对每个队列进行轮询扫描,判断头部消息是否达到延时时间,如果达到则将消息投递到 Real Topic,否则继续轮询。
使用示例如下:
// 在Producer端设置消息为延时消息Message msg = new Message();msg.setTopic("TopicA");msg.setTags("Tag");msg.setBody("this is a delay message".getBytes());// 设置延迟level为5,对应延迟1分钟msg.setDelayTimeLevel(5);producer.send(msg);
但是开源社区的方案也存在一些不足:
系统仅支持预定义的有限延迟级别,限制了延迟配置的灵活性与适应性。
最大延迟时间存在硬性上限,无法满足超长周期延迟任务的需求。
延迟时间的精度控制不足,难以实现细粒度的延迟调度。
腾讯云消息队列 RocketMQ 版延时消息功能增强
由于多级别延时受限于队列数量,因此 TDMQ RocketMQ 在社区的基础上增强了一种支持高精度、超长延时,自定义延时的实现,即“超长秒级定时消息”的实现。
超长秒级定时消息支持用户设置任意时间(默认最长40天,5.x 铂金版可以定制支持),新的方案引入了文件版时间轮来实现,同时支持定时消息的撤回,在设计上通过消息的重新投递使超长延时消息不受消息存储时间限制:
定时消息实现不侵入原本存储逻辑,防止互相影响,通过将定时消息写入定时消息主题,对该主题的索引文件扫描从而拿到原始消息;
实现任意时间定时的要点在于知道在某一时刻需要投递哪些消息,因此需要额外设计存储格式,同时尽可能复用 Commitlog 消息文件存储,通过引入定时消息索引文件,原始消息存储在 Commitlog,同时为了能扫描出此刻的所有定时消息并且兼顾消息写入性能,采用链表结构进行索引单元的链接,定时消息索引文件写入直接 Append-only Log(顺序写入),保证了消息写入的性能;
为了定位第一条定时消息索引,引出时间轮结构,需要作为中间层去精准访问定时消息索引文件。
最终为定时消息(在 rip-43)引入两个存储文件:Timelog + Timewheel。

TimerWheel 是时间轮的文件,表示投递时间,它保存了2天(默认,同时保证超长定时消息不受消息存储时间限制)内的所有时间窗。每个槽位表示一个对应的投递时间窗,并且可以调整槽位对应的时间窗长度来控制定时的精确度。采用时间轮的好处是它可以复用,在2天之后无需新建时间轮文件,而是只要将当前的时间轮直接覆盖即可。
/*** Represents a slot of timing wheel. Format:* ┌────────────┬───────────┬───────────┬───────────┬───────────┐* │delayed time│ first pos │ last pos │ num │ magic │* ├────────────┼───────────┼───────────┼───────────┼───────────┤* │ 8bytes │ 8bytes │ 8bytes │ 4bytes │ 4bytes │* └────────────┴───────────┴───────────┴───────────┴───────────┘*/
TimerLog 是定时消息索引文件,保存定时消息的索引(在消息文件中存储的位置),内部通过反向链表进行链接,它的写入为 Append-only Log,保证了消息写入的性能。
public final static int UNIT_SIZE = 4 //size+ 8 //prev pos+ 4 //magic value+ 8 //curr write time, for trace+ 4 //delayed time, for check+ 8 //offsetPy+ 4 //sizePy+ 4 //hash code of real topic+ 8; //reserved value, just in case of
TimerWheel 中的每个槽位都可以保存一个指向 TimerLog 中某个元素的索引,TimerLog 中的元素又保存它前一个元素的索引。TimerLog 呈链表结构,存储着 TimerWheel 对应槽位时间窗所要投递的所有定时消息。

从上图中可以看出,共有五个 Service 分别处理定时消息的放置和存储。工作流如下:
1. 针对放置定时消息的 Service,每50ms从消息文件读取指定主题(TIMER_TOPIC)的定时消息。
1.1
TimerEnqueueGetService 从消息文件读取得到定时主题的消息,并先将其放入 EnqueuePutQueue。1.2 另一个线程
TimerEnqueuePutService 将其执行 Timerlog-unit 构建逻辑并放入 TimerLog,更新时间轮(Timewheel)的存储内容。2. 针对取出定时消息的 Service,每50ms读取下一秒的 Slot。有三个线程将读取到的消息重新放回 CommitLog。
2.1 首先,
TimerDequeueGetService 每 50ms 读一次下一秒的 Slot,从 TimerLog 中得到指定的数据,并放进 dequeueGetQueue。2.2 而后
TimerDequeueGetMessageService 从 dequeueGetQueue 中取出数据并根据索引信息,从消息文件中查出对应的 msgs,并将其放入待写入消息文件的队列中(dequeuePutQueue)。2.3 最后
TimerDequeuePutMessageService 将这个 Putqueue 中的消息取出,若已到期则修改 Topic,放回 Commitlog(投递到真正的 Topic),否则继续按指定主题(TIMER_TOPIC)写回 CommitLog 滚动(避免消息过期)。使用示例如下:
Message message = new Message(TOPIC, ("Hello" + i).getBytes(StandardCharsets.UTF_8));// 延迟 10s 后投递message.setDelayTimeSec(10);// 延迟 10000ms 后投递,投递到服务端后计算定时时间,即投递到服务端的时间+delayTimemessage.setDelayTimeMs(10_000L);// 定时投递,定时时间为当前时间 + 10000msmessage.setDeliverTimeMs(System.currentTimeMillis() + 10_000L);// 发送消息SendResult result = producer.send(message);
5.x 进一步的技术优化
TDMQ RocketMQ 5.x 的基于文件版,利用反向链表索引的方案,大大降低了存储成本,但是反向链表的扫描效率不高,SSD 盘下基本1000 TPS 就会成为瓶颈,造成调度误差增大。
腾讯云选取 RocksDB 支持定时消息多级时间轮,利用 KV 结构可以快速范围扫描某一时刻的定时消息,保证更精准的定时调度。
我们为小时/分钟/秒钟都设置为一个 Wheel 管理,类似钟表,通过秒针转动一圈驱动分针转动,分针转动一圈驱动时针转动;当定时超过一天时,仍放进小时级别的时间轮,后续会重新投递此消息,避免消息过期;

使用限制
使用延时消息时,请确保客户端的机器时钟和服务端的机器时钟(所有地域均为 UTC+8 北京时间)保持一致,否则会有时差。
定时和延时消息在精度上会有1秒左右的偏差。
关于定时和延时消息的时间范围,根据不同的集群规格,有不同的限制,可以参考产品系列。
使用定时消息时,设置的时刻在当前时刻以后才会有定时效果,否则消息将被立即发送给消费者。