前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ 如何实现延迟队列?答案可不止一种

RabbitMQ 如何实现延迟队列?答案可不止一种

作者头像
程序员鱼皮
发布2024-08-20 20:33:24
990
发布2024-08-20 20:33:24
举报
文章被收录于专栏:鱼皮客栈

回答重点

RabbitMQ 本身不支持延迟消息,但是可以通过它提供的两个特性 TTL(Time-To-Live and Expiration ,消息存活时间)、DLX(Dead Letter Exchanges,死信交换器) 来实现。还可以利用 RabbitMQ 插件来实现。

使用TTL + 死信队列:

在 RabbitMQ 中,通过设置消息的 TTL 和死信交换器可以实现延迟队列。

不给原队列(正常队列)设置消费者,当消息在原队列中达到 TTL 后,由于还未被消费,则会被转发到绑定的死信交换器,消费者从死信队列中消费消息,从而实现消息的延迟处理。

使用 RabbitMQ 插件:延迟消息插件(rabbitmq-delayed-message-exchange):

通过安装 RabbitMQ 的延迟消息插件,可以直接创建延迟交换器(Delayed Exchange)。

在发送消息时,指定消息的延迟时间,RabbitMQ 会在消息达到延迟时间后将其转发到对应的队列进行消费。

扩展知识

TTL 和 DLX 简要说明

  • TTL(Time-To-Live):指消息在队列中的存活时间。你可以为队列中的所有消息统一设置TTL,也可以为每条消息单独设置TTL。当消息超过TTL时,消息会被标记为过期。
  • 死信队列(DLX):当消息在原队列中过期、被拒绝(nack/reject)或队列已满时,消息会被转发到绑定的死信交换器(DLX)。DLX 可以将消息重新路由到死信队列(即这里的延迟队列)。

TTL + DLX 时序问题

因为队列的特点就是先进先出,如果发送的消息延迟的时间不同,例如第一个延迟 10s、第二个延迟 5s、第三个延迟 1s。

那么后面的消息,需要等 10s 的消息消费完才能消费。当 10s 消息未被消费,则后续的消息都会被阻塞,即使消息设置了更短的延迟。

这就是时序问题。如果一个队列中的消息延迟时间都一致,就可以避免这个问题,因此可以针对不同的延迟时间对应多创建几个队列。

或者可以利用延迟消息插件,插件不会有时序问题。

延迟消息插件原理

插件提供了一种新的交换器类型 x-delayed-message。这种交换器可以像普通的交换器一样,接收消息并根据路由键将消息路由到相应的队列。只不过 x-delayed-message 类型的交换机接收消息投递后,不会直接路由到队列中,而是存储到 Mnesia(Mnesia 是 Erlang 运行时中自带的一个数据库管理系统)中。

等到消息达到可投递时间,消息才会被投递到目标队列中。

更多关于插件的内容,可以查看 github:rabbitmq-delayed-message-exchange。

TTL + DLX 实现

代码语言:javascript
复制
public class DelayedQueueExample {

    private static final String EXCHANGE_NAME = "normal_exchange";
    private static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    private static final String QUEUE_NAME = "normal_queue";
    private static final String DELAYED_QUEUE_NAME = "delayed_queue";
    private static final String ROUTING_KEY = "routing_key";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            // 声明死信交换器
            channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, BuiltinExchangeType.DIRECT);

            // 声明延迟队列(用于接收过期消息)
            channel.queueDeclare(DELAYED_QUEUE_NAME, true, false, false, null);
            channel.queueBind(DELAYED_QUEUE_NAME, DEAD_LETTER_EXCHANGE, ROUTING_KEY);

            // 声明正常交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            // 配置带有死信交换器的正常队列
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
            argsMap.put("x-dead-letter-routing-key", ROUTING_KEY);
            argsMap.put("x-message-ttl", 5000); // 消息的TTL为5秒

            channel.queueDeclare(QUEUE_NAME, true, false, false, argsMap);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            // 向正常交换器发送消息
            String message = "This is a delayed message!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println("Sent message: " + message);
        }
    }
}

延迟插件的实现

安装插件

首先,确保安装并启用了 rabbitmq_delayed_message_exchange 插件:

代码语言:javascript
复制
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
使用插件配置延迟队列

在安装插件后,可以通过设置 x-delayed-type 参数来创建支持延迟消息的交换机:

代码语言:javascript
复制
public class RabbitMQDelayedExchange {
    private static final String DELAYED_EXCHANGE = "delayed_exchange";
    private static final String DELAYED_QUEUE = "delayed_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明延迟交换机
            Map<String, Object> argsMap = new HashMap<>();
            argsMap.put("x-delayed-type", BuiltinExchangeType.DIRECT.getType());

            channel.exchangeDeclare(DELAYED_EXCHANGE, "x-delayed-message", true, false, argsMap);
            channel.queueDeclare(DELAYED_QUEUE, true, false, false, null);
            channel.queueBind(DELAYED_QUEUE, DELAYED_EXCHANGE, "");

            System.out.println("Delayed exchange and queue declared.");

            // 发送带有延迟时间的消息
            String message = "This is a delayed message";
            Map<String, Object> headers = new HashMap<>();
            headers.put("x-delay", 10000);  // 延迟 10 秒

            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .headers(headers)
                    .build();

            channel.basicPublish(DELAYED_EXCHANGE, "", properties, message.getBytes("UTF-8"));
            System.out.println("Message sent with delay: " + message);
        }
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-08-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员鱼皮 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 回答重点
    • 使用TTL + 死信队列:
      • 使用 RabbitMQ 插件:延迟消息插件(rabbitmq-delayed-message-exchange):
      • 扩展知识
        • TTL 和 DLX 简要说明
          • TTL + DLX 时序问题
            • 延迟消息插件原理
              • TTL + DLX 实现
                • 延迟插件的实现
                  • 安装插件
                  • 使用插件配置延迟队列
              相关产品与服务
              数据库智能管家 DBbrain
              数据库智能管家(TencentDB for DBbrain,DBbrain)是腾讯云推出的一款为用户提供数据库性能、安全、管理等功能的数据库自治云服务。DBbrain 利用机器学习、大数据手段、专家经验引擎快速复制资深数据库管理员的成熟经验,将大量传统人工的数据库运维工作智能化,服务于云上和云下企业,有效保障数据库服务的安全、稳定及高效运行。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档