前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过 rabbitmq 的 TTL 与 DLX 设置实现延时队列

通过 rabbitmq 的 TTL 与 DLX 设置实现延时队列

作者头像
用户3147702
发布2022-06-27 13:20:51
6690
发布2022-06-27 13:20:51
举报
文章被收录于专栏:小脑斧科技博客

1. 概述

rabbitmq 是目前使用最为普及的消息队列组件,基于 AMQP 的 rabbitmq 在各方面设计都比较完善,同时,它具有非常丰富的功能与特性,可以支持各种实际的适用场景。 但是 rabbitmq 本身并不直接支持延时队列的功能,本文我们就来介绍一下,如何通过 rabbitmq 的特性实现一个延时队列。

2. 延时队列的简易实现

延时队列就是只有当消息在队列中存放达到指定的时间后,才可以被消费,他的应用场景通常并不多,但在此前我们介绍的秒杀系统中非常常用。 当用户提交订单指定时间后没有支付,那么用户的订单应该被取消以便其他用户可以继续抢购,这样的情况下,延时队列就非常有必要了。

之前的文章中,我们使用 redis 集群来实现了这个功能,redis 中存储了下单时间,以分钟为粒度扫描相应的 key,即可扫出所有下单时间超过指定时间间隔的数据。 rabbitmq 也可以依照上述理论,定时取出所有消息,时间间隔不足的则放回队列。 这样的方法优势在于实现简单,但是显然性能较低,虽然 rabbitmq 不支持延时队列的功能,但是我们依然可以借用 rabbitmq 的消息过期机制与失效消息转发机制来实现我们需要的延时队列功能。

3. rabbitmq 与消息过期时间 — TTL

3.1. 为队列设置消息过期时间

rabbitmq 支持在创建队列时对队列设置消息过期时间:

代码语言:javascript
复制
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

也可以通过 rabbitmqctl 命令对符合条件的队列设置消息过期时间规则:

代码语言:javascript
复制
rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues
<code>

<h3>为消息设置消息过期时间
同时,rabbitmq 也支持设置单条消息的过期时间:
<code mode="java">
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

如果同时设置了队列和消息的过期时间,那么消息实际的过期时间将会是两个设置值的较小值。

4. 失效消息转发队列 — DLX

一旦上述消息过期时间设置生效,某条消息达到消息过期时间,那么他将会成为一条“dead-lettered”,此外,被拒绝的消息如果 requeue 属性为 false,或者消息所在队列已达到最大长度,那么他也将成为“dead-lettered”。 如果我们设置了 DLX 规则,即失效消息转发规则,那么失效的消息就会被转发到相应的 exchange 和 queue。

4.1. 通过代码设置失效消息转发队列

我们可以通过下列代码进行设置:

代码语言:javascript
复制
channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
channel.queueDeclare("myqueue", false, false, false, args);

这样,一旦消息失效,则消息会被自动转发到你设置的 x-dead-letter-exchange 上的同名队列。 你也可以通过下面的代码指定具体转发的目标 routing-key:

代码语言:javascript
复制
args.put("x-dead-letter-routing-key", "some-routing-key");

4.2. 通过 rabbitmqctl 命令设置失效消息转发队列

同样你也可以通过 rabbitmqctl 命令设置失效消息转发队列:

代码语言:javascript
复制
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues

如果你需要指定转发的具体消息队列,你需要为消息指定 x-dead-letter-routing-key 属性。

5. 综述

进行了上述设置以后,消息就会在你指定的延时时间后自动被转发到相应的消息队列中,你需要做的就是去转发后的目标队列中实时取出消息,一个延时队列就这样应运而生了。

6. 参考文献

RabbitMQ TTL — http://www.rabbitmq.com/ttl.html。 RabbitMQ DLX — http://www.rabbitmq.com/dlx.html。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-03-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. 延时队列的简易实现
  • 3. rabbitmq 与消息过期时间 — TTL
    • 3.1. 为队列设置消息过期时间
    • 4. 失效消息转发队列 — DLX
      • 4.1. 通过代码设置失效消息转发队列
        • 4.2. 通过 rabbitmqctl 命令设置失效消息转发队列
        • 5. 综述
        • 6. 参考文献
        相关产品与服务
        消息队列 CMQ 版
        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档