前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RocketMQ实践: 定时/延时消息的实现原理及如何实现任意时间精度的延时,打破RocketMQ定时时间设置的限制

RocketMQ实践: 定时/延时消息的实现原理及如何实现任意时间精度的延时,打破RocketMQ定时时间设置的限制

作者头像
崔认知
发布2023-06-20 11:12:43
发布2023-06-20 11:12:43
4.4K00
代码可运行
举报
文章被收录于专栏:nobodynobody
运行总次数:0
代码可运行

简介


定时/延时消息在业务开发中使用非常广泛,比如分布式定时调度(在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求)和任务超时处理(以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单)。

定时/延时消息是 RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。

但是RocketMQ的定时/延时消息不支持任意时间片,目前至RocketMQ-5.0.0版本之前(5.0使用了时间轮实现了更精确的)仅支持18个固定的时间段,其定义的延迟级别为

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

【关注公众号:认知科技技术团队】

RocketMQ:定时/延时消息的实现原理


先简单介绍一下消息的发送和存储及到消息可见的流程

1、生产者根据从NameServer拉取的Topic路由信息,依据一定的队列选择算法,将消息发送到某个Broker;

2、消息到达Broker,不是直接存放至队列,而是保存到CommitLog文件;

消息存储由DefaultMessageStore处理,最终写入CommitLog文件。

3、DefaultMessageStore开启后台线程,周期性的读取CommitLog文件,写入消费者可见的队列以让消费者可以订阅到,同时对CommitLog文件的数据做索引,以便于查询搜索;

DefaultMessageStore开启后台任务:

代码语言:javascript
代码运行次数:0
运行
复制
private ReputMessageService reputMessageService;

代码语言:javascript
代码运行次数:0
运行
复制
  this.reputMessageService.start();
代码语言:javascript
代码运行次数:0
运行
复制
org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run

RocketMQ定时/延时消息的实现,主要在上述第二做了些小动作,第二步,当判断消息是定时/延时消息时,会把原消息设置的topic设置为SCHEDULE_TOPIC_XXXX,根据延迟时间的设置,设置对应的延迟队列,然后消息再存入CommitLog文件中

RocketMQ:定时/延时消息的实现流程:

1、生产者在发送消息时,设置延迟时间;

代码语言:javascript
代码运行次数:0
运行
复制
org.apache.rocketmq.common.message.Message#setDelayTimeLevel
org.apache.rocketmq.common.message.Message#setDelayTimeSec
org.apache.rocketmq.common.message.Message#setDelayTimeMs
org.apache.rocketmq.common.message.Message#setDeliverTimeMs

2、DefaultMessageStore在存储消息至CommitLog文件之前,重写消息的topic和队列信息,再写入CommitLog文件;

延迟消息topic名字:SCHEDULE_TOPIC_XXXX

代码语言:javascript
代码运行次数:0
运行
复制
public static final String RMQ_SYS_SCHEDULE_TOPIC 
= "SCHEDULE_TOPIC_XXXX";

3、DefaultMessageStore开启的后台线程,周期性的读取CommitLog文件,对于延迟消息,则写入topic 为SCHEDULE_TOPIC_XXXX对应的18个队列中;

4、Broker后台每个延迟级别的延迟队列,都启动定时任务去消费延迟消息,根据延迟消息的原topic及队列信息,再次走消息发送流程,此时发送的消息不再是延迟消息,是消费者可见的。

Broker启动的时候会创建ScheduleMessageService。

ScheduleMessageService为每个延迟topic 为SCHEDULE_TOPIC_XXXX对应的18个延迟队列创建18个后台定时任务去消费延迟消息。

RocketMQ定时时间设置的限制的打破,如何实现任意时间精度的延时


RocketMQ延迟消息的延迟级别只有18种:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

在很多业务场景下受限制,不过我们可以利用RocketMQ延迟消息的设计自己做扩展,让消息发送到特殊(消费者不可见)topic对应的队列中,然后消息转存到第三方存储,开启定时轮询消息,再次投递到RocketMQ队列,让消费者可见。

实现可以参考滴滴开源的:DDMQ

https://gitee.com/didiopensource/DDMQ;

延迟消息转存到RocksDB,实现类似时序数据库的存储方式,利用其前缀搜索接口搜到某个时间内的消息,再次投递到RocketMQ

https://gitee.com/didiopensource/DDMQ/blob/master/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPushService.java

小结


RocketMQ的定时/延时消息在业务开发中使用广泛,博文介绍了其实现原理及如何实现任意时间精度的延时,使用RocksDB实现类似时序数据库存储,打破RocketMQ定时时间设置的限制。


本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-10-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 认知科技技术团队 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档