前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >16-RabbitMQ高级特性-延迟队列

16-RabbitMQ高级特性-延迟队列

作者头像
Devops海洋的渔夫
发布2023-02-10 13:57:58
2740
发布2023-02-10 13:57:58
举报
文章被收录于专栏:Devops专栏Devops专栏

16-RabbitMQ高级特性-延迟队列

延迟队列

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功7天后,发送短信问候。

实现方式:

对于上面两种需求,一般有两种实现方式:

  1. 定时器
  2. 延迟队列
  • 定时器:设置一个轮询时间,间隔一段时间对数据库进行扫描对比,当符合定时的数据则进行处理; 缺点:
    • 不优雅,因为不管设置多少间隔时间,都会对数据库产生多次扫描的执行,影响性能;
    • 而且间隔的时间范围对具体时间点存在一定的误差,可能没有扫描到,例如:间隔时间设置为1分钟,那么订单可能在29分或者31分钟几秒,那么则扫描不到,这样就会影响用户体验。
  • 延迟队列: 通过延迟队列控制消息,不会对数据库多次扫描,只有当消息达到了一定的时间,才发送至消费端处理即可,非常优雅!

很可惜,在RabbitMQ中并未提供延迟队列功能。

但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

案例

下面我们就采用 TTL+死信队列 的组合实现延迟队列的功能。

1.【生产者】配置正常的交换机、队列和死信交换机、队列

代码语言:javascript
复制
<!--
    延迟队列:
        1. 定义正常交换机(order_exchange)和队列(order_queue)
        2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
        3. 绑定,设置正常队列过期时间为30分钟
-->
<!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
<rabbit:queue id="order_queue" name="order_queue">
    <!-- 3. 绑定,设置正常队列过期时间为30分钟-->
    <rabbit:queue-arguments>
        <!-- 转发死信交换机 -->
        <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
        <!-- 转发死信的routingkey -->
        <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
        <!-- 设置 TTL 10秒,模拟30分钟-->
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
    <rabbit:bindings>
        <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>

<!--  2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
    <rabbit:bindings>
        <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
    </rabbit:bindings>
</rabbit:topic-exchange>

2.【生产者】编写发送订单消息至正常队列,验证:TTL过期后,自动进入死信队列中

代码语言:javascript
复制
    @Test
    public void testDelay() throws InterruptedException {
        //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2022年03月06日16:41:47");
        
        //2.打印倒计时10秒
        for (int i = 0; i < 10; i++) {
            System.out.println("倒计数: " + i);
            Thread.sleep(1000);
        }
    }

执行之后,从控制面板可以看到消息已经进入了正常队列:

等待过期时间过后,消息未消费则自动进入死信队列:

从死信队列中查看消息,确认消息内容:

3.【消费者】编写监听器类,接收延迟的订单消息

代码语言:javascript
复制
package com.lijw.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;


/**
 * @author Aron.li
 * @date 2022/3/4 23:36
 */
@Component
public class OrderListener implements ChannelAwareMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        //1. 获取传递的标签,用于消息确认
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //2. 接收消息
            System.out.println(new String(message.getBody()));

            //3. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");

            //4. 手动签收
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //e.printStackTrace();

            //5. 拒绝签收, 设置不重复队列
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag, true, false);
        }
    }
}

4.【消费者】配置监听器 监听死信队列,进行消费延迟订单信息

代码语言:javascript
复制
<!--  定义监听器与队列的绑定  -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
    <!--延迟队列效果实现:  一定要监听的是 死信队列!!!-->
    <rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/>
</rabbit:listener-container>

5.【消费者】启动测试代码,启动Spring框架,开启监听器

代码语言:javascript
复制
package com.lijw;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * @author Aron.li
 * @date 2022/3/4 23:41
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {

    @Test
    public void test01() {
        while (true) {

        }
    }

}

6. 【生产者】发送一条消息,验证:消息 --> 正常队列 TTL过期 --> 死信队列 --> 消费订单信息

代码语言:javascript
复制
@Test
public void testDelay() throws InterruptedException {
    //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
    rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2022年03月06日16:41:47");

    //2.打印倒计时10秒
    for (int i = 0; i < 10; i++) {
        System.out.println("倒计数: " + i);
        Thread.sleep(1000);
    }
}

执行发送订单消息,倒计时10秒:

10秒结束后,消费者监听器接收到订单信息:

小结

  1. 延迟队列 指消息进入队列后,可以被延迟一定时间,再进行消费。
  2. RabbitMQ没有提供延迟队列功能,但是可以使用 :TTL + DLX 来实现延迟队列效果。
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-01-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 海洋的渔夫 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 16-RabbitMQ高级特性-延迟队列
    • 延迟队列
      • 案例
        • 1.【生产者】配置正常的交换机、队列和死信交换机、队列
        • 2.【生产者】编写发送订单消息至正常队列,验证:TTL过期后,自动进入死信队列中
        • 3.【消费者】编写监听器类,接收延迟的订单消息
        • 4.【消费者】配置监听器 监听死信队列,进行消费延迟订单信息
        • 5.【消费者】启动测试代码,启动Spring框架,开启监听器
        • 6. 【生产者】发送一条消息,验证:消息 --> 正常队列 TTL过期 --> 死信队列 --> 消费订单信息
      • 小结
      相关产品与服务
      数据库
      云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档