简介
定时/延时消息在业务开发中使用非常广泛,比如分布式定时调度(在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求)和任务超时处理(以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单)。
定时/延时消息是 RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
但是RocketMQ的定时/延时消息不支持任意时间片,目前至RocketMQ-5.0.0版本之前(5.0使用了时间轮实现了更精确的)仅支持18个固定的时间段,其定义的延迟级别为:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
【关注公众号:认知科技技术团队】
RocketMQ:定时/延时消息的实现原理
先简单介绍一下消息的发送和存储及到消息可见的流程:
1、生产者根据从NameServer拉取的Topic路由信息,依据一定的队列选择算法,将消息发送到某个Broker;
2、消息到达Broker,不是直接存放至队列,而是保存到CommitLog文件;
消息存储由DefaultMessageStore处理,最终写入CommitLog文件。
3、DefaultMessageStore开启后台线程,周期性的读取CommitLog文件,写入消费者可见的队列以让消费者可以订阅到,同时对CommitLog文件的数据做索引,以便于查询搜索;
DefaultMessageStore开启后台任务:
private ReputMessageService reputMessageService;
this.reputMessageService.start();
org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run
RocketMQ定时/延时消息的实现,主要在上述第二做了些小动作,第二步,当判断消息是定时/延时消息时,会把原消息设置的topic设置为SCHEDULE_TOPIC_XXXX,根据延迟时间的设置,设置对应的延迟队列,然后消息再存入CommitLog文件中。
RocketMQ:定时/延时消息的实现流程:
1、生产者在发送消息时,设置延迟时间;
org.apache.rocketmq.common.message.Message#setDelayTimeLevel
org.apache.rocketmq.common.message.Message#setDelayTimeSec
org.apache.rocketmq.common.message.Message#setDelayTimeMs
org.apache.rocketmq.common.message.Message#setDeliverTimeMs
2、DefaultMessageStore在存储消息至CommitLog文件之前,重写消息的topic和队列信息,再写入CommitLog文件;
延迟消息topic名字:SCHEDULE_TOPIC_XXXX
public static final String RMQ_SYS_SCHEDULE_TOPIC
= "SCHEDULE_TOPIC_XXXX";
3、DefaultMessageStore开启的后台线程,周期性的读取CommitLog文件,对于延迟消息,则写入topic 为SCHEDULE_TOPIC_XXXX对应的18个队列中;
4、Broker后台每个延迟级别的延迟队列,都启动定时任务去消费延迟消息,根据延迟消息的原topic及队列信息,再次走消息发送流程,此时发送的消息不再是延迟消息,是消费者可见的。
Broker启动的时候会创建ScheduleMessageService。
ScheduleMessageService为每个延迟topic 为SCHEDULE_TOPIC_XXXX对应的18个延迟队列创建18个后台定时任务去消费延迟消息。
RocketMQ定时时间设置的限制的打破,如何实现任意时间精度的延时
RocketMQ延迟消息的延迟级别只有18种:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
在很多业务场景下受限制,不过我们可以利用RocketMQ延迟消息的设计自己做扩展,让消息发送到特殊(消费者不可见)topic对应的队列中,然后消息转存到第三方存储,开启定时轮询消息,再次投递到RocketMQ队列,让消费者可见。
实现可以参考滴滴开源的:DDMQ
https://gitee.com/didiopensource/DDMQ;
延迟消息转存到RocksDB,实现类似时序数据库的存储方式,利用其前缀搜索接口搜到某个时间内的消息,再次投递到RocketMQ。
https://gitee.com/didiopensource/DDMQ/blob/master/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService.java
小结
RocketMQ的定时/延时消息在业务开发中使用广泛,博文介绍了其实现原理及如何实现任意时间精度的延时,使用RocksDB实现类似时序数据库存储,打破RocketMQ定时时间设置的限制。