首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

RocketMQ进阶-延时消息

如果每小时轮询一次,最差的情况时间误差会达到1小时; 如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题; 既然cron方案不是很理想,那就请出我们今天的主角,使用RocketMQ的延时消息解决...在创建订单的时候发送一条延时消息到RocketMQ,30分钟后消费者消费消息去检查订单的状态,如果发现订单未支付则取消订单释放库存。...6m 7m 8m 9m 10m 20m 30m 1h 2h"; 下面我们结合SprintBoot利用RocketMQ发送延时消息 引入RocketMQ组件 <groupId...1修改成了90s,生产者发送消息后需要90s后再进行消息投递。.../conf/broker.conf & 使用延时等级1发送消息 public void sendDelayMessage() { delayProduce.sendDelayMessage("delay-topic

2.7K31

延时消息常见实现方案

—1— 前言 延时消息(定时消息)指的在分布式异步消息场景下,生产端发送一条消息,希望在指定延时或者指定时间点被消费端消费到,而不是立刻被消费。...基于外部存储的方案本质上都是一个套路,将 MQ 和 延时模块 区分开来,延时消息模块是一个独立的服务/进程。延时消息先保留到其他存储介质中,然后在消息到期时再投递到 MQ。...延时消息的逻辑就是代理层实现了对延时消息的转发,如果是延时消息,会先投递到 RocketMQ 中 Chronos 专用的 topic 中。...至于第三个问题,估计是比较难解决的,需要在数据存储层将延时消息和正常消息区分开来,单独存储延时消息。...总结一下设计上的亮点: 时间轮算法适合延时/定时消息的场景,省去延时消息的排序,插入删除操作都是 O(1) 的时间复杂度; 通过多级时间轮设计,支持了超大时间跨度的延时消息; 通过延时加载,内存中只会有最近要消费的消息

91920
  • 您找到你想要的搜索结果了吗?
    是的
    没有找到

    延时消息常见实现方案

    基于外部存储的方案本质上都是一个套路,将 MQ 和 延时模块 区分开来,延时消息模块是一个独立的服务/进程。延时消息先保留到其他存储介质中,然后在消息到期时再投递到 MQ。...延时消息的逻辑就是代理层实现了对延时消息的转发,如果是延时消息,会先投递到 RocketMQ 中 Chronos 专用的 topic 中。...1,即一个queue只存相同延时消息,保证具有相同发送延时消息能够顺序消费。...至于第三个问题,估计是比较难解决的,需要在数据存储层将延时消息和正常消息区分开来,单独存储延时消息。 QMQ QMQ提供任意时间的延时/定时消息,你可以指定消息在未来两年内(可配置)任意时间内投递。...总结一下设计上的亮点: 时间轮算法适合延时/定时消息的场景,省去延时消息的排序,插入删除操作都是 O(1) 的时间复杂度; 通过多级时间轮设计,支持了超大时间跨度的延时消息; 通过延时加载,内存中只会有最近要消费的消息

    52420

    如何快速实现“延时消息”?

    有重复计算的嫌疑; (3)时效性不够好,如果每小时轮询一次,最差的情况下,时间误差会达到1小时; (4)如果通过增加cron轮询频率来减少时间误差,则轮询低效和重复计算的问题会进一步凸显; 对于这类需要延时执行的任务...答案是:高效延时消息。...高效延时消息,包含两个重要的数据结构: (1)环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组); (2)任务集合,环上每一个slot是一个Set; 同时,启动一个timer...如上图,假设当前Current Index指向第一格,当有延时消息到达之后,例如希望3610秒之后,触发一个延时消息任务,只需: (1)计算这个Task应该放在哪一个slot,现在指向1,3610秒之后...使用了“延时消息”方案之后,“订单48小时后关闭评价”的需求,只需将在订单关闭时,触发一个48小时之后的延时消息即可: (1)无需再轮询全部订单,效率高; (2)一个订单,任务只执行一次; (3)时效性好

    98130

    玩转redis-延时消息队列

    上一篇基于redis的list实现了一个简单的消息队列:玩转redis-简单消息队列 源码地址 使用demo 产品经理经常说的一句话,我们不光要有X功能,还要Y功能,这样客户才能更满意。...同样的,只有简单消息队列是不够的,还要有延时消息队列才能算是一个完整的消息队列。...看看redis的命令,放眼望去,的有序集合(sorted set)就是一个很好用的命令,完全可以用他做一个延时消息队列 ?...轮循获取数据时,查找小于或等于当前时间的数据项,就是具体的延时消息。 还有一个问题,就是ZRANGEBYSCORE和list的pop不同,pop是取出元素并且会把元素在list中删除。...具体实现-code 添加延时消息,参数delay就是我们要延时多久: func (p *Producer) PublishDelayMsg(topicName string, body []byte,

    1.1K30

    ActiveMQ笔记(6):消息延时投递

    在开发业务系统时,某些业务场景需要消息定时发送或延时发送(类似:飞信的短信定时发送需求),这时候就需要用到activemq的消息延时投递,详细的文档可参考官网说明,本文只介绍二种常用的用法: 注:本文采用...24小时 logger.info("消息2:[" + message2 + "] 延时发送成功!")...时(0-23) 日(1-31) 月(1-12) 星期几(1-7) ") 发送成功后,可以登录activemq的webconsole查看消息的属性: 在scheduled面板中,可以看到延时消息 ?...注:在开启消息持久化存储的前提下,就算把相应的queue在webconsole面板中删除(即删除队列),只要投递的时间尚未到,该消息也不会删除,仍然能正常延时投递。...此外,在queues面板中,如何查看某条具体的消息,也可以通过属性发现这条消息延时消息,参考下图: ?

    2K50

    MQ教程 | 基于RabbitMQ消息延时队列

    消息的TTL(Time To Live) 消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。...所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。...,通过传递 { expiration: '30000'} 将这条消息的过期时间设为了 30秒,对消息设置 30秒 钟过期,这条消息并不一定就会在30秒钟后被丢弃或进入死信,只有当这条消息到达队首即将被消费时才会判断其是否过期...上面的消息的TTL到了,消息过期了。 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。 当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。...▍定时任务 因为队列中的消息过期后会成为死信,而死信又会被发布到该消息所在的队列的 DLX 上去,所以通过为消息设置过期时间,然后再消费该消息所在队列的 DLX 所绑定的队列,从而来达到定时处理一个任务的目的

    3.8K30

    使用 Kotlin+RocketMQ 实现延时消息

    延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。...除此之外,还可以使用消息队列来实现延时消息,例如 RocketMQ。 二....RocketMQ 实现延时消息 3.1 业务背景 我们的系统完成某项操作之后,会推送事件消息到业务方的接口。...3.2 生产者(Producer) 生产者负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。 首先,定义一个支持延时发送的 AbstractProducer。...总结 正如本文开头曾介绍过,可以使用多种方式来实现延时消息。然而,我们的系统本身就大量使用了 RocketMQ,借助成熟的 RocketMQ 实现延时消息不失为一种可靠而又方便的方式。

    82330

    阿里大牛带你轻松实现RabbitMQ 延时消息

    RabbitMQ 延时消息的实现(上) 我们在实际业务中有一些需要延时发送消息的场景,例如: 家里有一台智能热水器,需要在30分钟后启动 未付款的订单,15分钟后关闭 注意这里的场景是延时,不是定时。...当然,解决了延时,定时就很简单了(定时=当前时刻+间隔时间)。 由于RabbitMQ本身不支持延时队列(延时消息),所以要通过其他方式来实现。...如果队列没有指定DLX或者无法被路由到一个DLQ,则队列中过期的消息会被直接丢弃。 因此,我们可以利用消息TTL的特性,实现消息延时投递。...3 使用死信队列实现延时消息的缺点: 如果统一用队列来设置消息的TTL,当梯度非常多的情况下,比如1分钟,2分钟,5分钟,10分钟,20分钟,30分钟……需要创建很多交换机和队列来路由消息。...RabbitMQ 延时消息的实现(下) 在RabbitMQ 3.5.7及以后的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延时队列功能。

    1.5K40

    Handler的消息延时是怎么实现的

    消息延时做了什么特殊处理? 是发送延时,还是消息处理延时延时的精度如何? ?...通常我们使用Handler的消息延时都是调用sendMessageDelayed函数实现的,其中delayMillis是需要延时的毫秒。 ?...从上面可以看到,Handler中sendMessageDelayed方法只是将消息按照要执行的先后顺序插入到消息队列中的,插入好了并不意味着就会按照设定的延时时间处理消息,那Handler时如何延时处理该消息的呢...Handler的消息延时的实现: 消息队列在插入消息的时候是按照消息的触发时间顺序排序的,先执行的消息放在单链表的头部,最后执行的消息放在单链表的尾部; 在消息执行的过程中,通过native层设置epoll_wait...关于延时精度:Handler的延时精度并不高,会受到前一个消息处理时间的影响,因为在Looper.loop()方法中,只有上一个消息被处理完之后,才会去queue中读取下一个消息。 ‍

    2.3K10

    基于spring event实现消息异步延时队列

    这里面有个业务场景是需要给相应的人员发送消息通知。 之前做分布式应用都是引入第三方组件mq,单独部署一个消息服务用于接收mq消息并发送对应的通知。...有问题找百度,看看是不是有人已经解决过这个问题了,结果找了半天,就找到类似的,但是做法就是发送的时候自己搞了一个延时队列,到时间后再执行发送,感觉实现也算实现了,但是感觉和我想要的不一样。...既然@async本身就是交给了线程池处理,而我使用的是ThreadPoolTaskExecutor,这个线程池不支持延时队列,解决办法就只能是sleep,于是乎我换成了有延时队列的线程池ScheduledThreadPoolExecutor...然后就在抽象基类里加了个延时枚举,重启后把断点打在了submit上,如图。 看到这个有点晕,找找有没有我的消息内容吧,然后就找到了。...正想着放弃呀,反正固定延时目前也行,但是看着这个arg$2总感觉是既熟悉又陌生,实体要是断点出来好像这个就是对象的属性名称,要不直接反射拿下值试试?那就试试吧。结果就写代码试。 结果断点下取到了。

    48310

    Redis应用-异步消息队列与延时队列

    异步消息队列 说道消息队列,你肯定会想到Kafka、Rabbitmq等消息中间件,这些专业的消息中间件提供了很多功能特性,当然他的部署使用维护都是比较麻烦的。...如果你对消息队列没那么高要求,想要轻量级的,使用Redis就没错啦。...所以我们使用sleep来解决,当没有消息的时候阻塞一段时间。但其实这样还会带来另一个问题,就是sleep会导致消息的处理延迟增加。这个问题我们可以通过blpop/brpop 来阻塞读取队列。...延迟队列 你是否在做电商项目的时候会遇到如下场景: 订单下单后超过一小时用户未支付,需要关闭订单 订单的评论如果7天未评价,系统需要自动产生一条评论 这个时候我们就需要用到延时队列了,顾名思义就是需要延迟一段时间后执行...我们可以将有序集合的value设置为我们的消息任务,把value的score设置为消息的到期时间,然后轮询获取有序集合的中的到期消息进行处理。

    70810

    Redis应用-异步消息队列与延时队列

    异步消息队列 说道消息队列,你肯定会想到Kafka、Rabbitmq等消息中间件,这些专业的消息中间件提供了很多功能特性,当然他的部署使用维护都是比较麻烦的。...如果你对消息队列没那么高要求,想要轻量级的,使用Redis就没错啦。...所以我们使用sleep来解决,当没有消息的时候阻塞一段时间。但其实这样还会带来另一个问题,就是sleep会导致消息的处理延迟增加。这个问题我们可以通过blpop/brpop 来阻塞读取队列。...延迟队列 你是否在做电商项目的时候会遇到如下场景: 订单下单后超过一小时用户未支付,需要关闭订单 订单的评论如果7天未评价,系统需要自动产生一条评论 这个时候我们就需要用到延时队列了,顾名思义就是需要延迟一段时间后执行...我们可以将有序集合的value设置为我们的消息任务,把value的score设置为消息的到期时间,然后轮询获取有序集合的中的到期消息进行处理。

    69120

    5 张图带你理解 RocketMQ 延时消息机制

    今天来聊一聊 RocketMQ 的延时消息是怎么实现的。 延时消息是指发送到 RocketMQ 后不会马上被消费者拉取到,而是等待固定的时间,才能被消费者拉取到。...延时消息的使用场景很多,比如电商场景下关闭超时未支付的订单,某些场景下需要在固定时间后发送提示消息。...因为延时级别有 18 个,所以这里有 18 个队列。如下图: 2.2 调度消息 延时消息写入后,会有一个调度任务不停地拉取这些延时消息,这个逻辑在类 ScheduleMessageService。...3 小时才能消费,而 RocketMQ 的延时消息最大延时级别只支持延时 2 小时,怎么处理?...3 总结 经过上面的讲解,延时消息的处理流程如下: 最后,延时消息延时时间并不精确,这个时间是 Broker 调度线程把消息重新投递到原始的 MessageQueue 的时间,如果发生消息积压或者 RocketMQ

    40820

    手把手实现一条延时消息

    延时消息:订单创建的时候发送一条 N 分钟到期的信息,一旦消息消费后便可判断订单是否可以取消。...延时消息 这里我们不过多讨论这类需求如何实现;重点聊聊这个延时消息,看它是如何实现的,基于实现延时消息的数据结构还能实现定时任务。...我在之前的开源 IM 项目中也加入了此类功能,可以很直观的发送一条延时消息,效果如下: 使用 :delay hahah 2 发送了一条两秒钟的延时消息,另外一个客户端将会在两秒钟之后收到该消息。...task1 = 而如果是一个 10s 的延时消息,则需要将它放到下标为 2 的槽中,但同时需要记录它所对应的圈数,不然就和 2 秒的延时消息重复了。...我这里所使用的场景在文初也提到了,就是真的发送一条延时消息; 现有的消息都是实时消息,所以要实现一个延时消息便是在现有的发送客户端处将延时消息放入到这个时间轮中,在任务到期时再执行真正的消息发送逻辑。

    31140

    ActiveMQ笔记(7):如何清理无效的延时消息

    ActiveMQ的延时消息是一个让人又爱又恨的功能,具体使用可参考上篇ActiveMQ笔记(6):消息延时投递,在很多需要消息延时投递的业务场景十分有用,但是也有一个缺陷,在一些大访问量的场景,如果瞬间向...下面的代码演示了,如何清理activemq中的延时消息(包括:全部清空及清空指定时间段的延时消息),这也是目前唯一可行的办法。...先来模拟瞬间向MQ发送大量延时消息: /** * 发送延时消息 * * @param messageUtil */ private static...下面的代码可以清空所有延时消息: /** * 删除所有延时消息 * * @param connectionFactory * @throws JMSException...,有些用力过猛了,很多时候,我们只需要清理掉过期的延时消息(即:本来计划是8:00投递出去的消息,结果过了8点还没投递出去)  /** * 删除过期的延时消息 *

    1.9K100

    弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息

    在 RocketMQ 4.x 版本,使用延时消息来实现消息的定时消费。延时消息可以一定程度上实现定时发送,但是有一些局限。...1 延时消息 1.1 简介 RocketMQ 的延时消息是指 Producer 发送消息后,Consumer 不会立即消费,而是需要等待固定的时间才能消费。...在一些场景下,延时消息是很有用的,比如电商场景下关闭 30 分钟内未支付的订单。 使用延时消息非常简单,只需要给消息的 delayTimeLevel 属性赋值就可以。...,延时级别成百上千,而且随时可能增加新的延时时间; 3.延时时间不准确,后台的定时线程可能会因为处理消息量大导致延时误差大。...3 总结 RocketMQ 4.x 版本只支持延时消息,有一些局限性。而 RocketMQ 新版本引入了定时消息,弥补了延时消息的不足。

    2K31
    领券