延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
需求:
实现方式:
对于上面两种需求,一般有两种实现方式:
很可惜,在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
下面我们就采用 TTL+死信队列 的组合实现延迟队列的功能。
<!--
延迟队列:
1. 定义正常交换机(order_exchange)和队列(order_queue)
2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
3. 绑定,设置正常队列过期时间为30分钟
-->
<!-- 1. 定义正常交换机(order_exchange)和队列(order_queue)-->
<rabbit:queue id="order_queue" name="order_queue">
<!-- 3. 绑定,设置正常队列过期时间为30分钟-->
<rabbit:queue-arguments>
<!-- 转发死信交换机 -->
<entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
<!-- 转发死信的routingkey -->
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
<!-- 设置 TTL 10秒,模拟30分钟-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
@Test
public void testDelay() throws InterruptedException {
//1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2022年03月06日16:41:47");
//2.打印倒计时10秒
for (int i = 0; i < 10; i++) {
System.out.println("倒计数: " + i);
Thread.sleep(1000);
}
}
执行之后,从控制面板可以看到消息已经进入了正常队列:
等待过期时间过后,消息未消费则自动进入死信队列:
从死信队列中查看消息,确认消息内容:
package com.lijw.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* @author Aron.li
* @date 2022/3/4 23:36
*/
@Component
public class OrderListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//1. 获取传递的标签,用于消息确认
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//2. 接收消息
System.out.println(new String(message.getBody()));
//3. 处理业务逻辑
System.out.println("处理业务逻辑...");
System.out.println("根据订单id查询其状态...");
System.out.println("判断状态是否为支付成功");
System.out.println("取消订单,回滚库存....");
//4. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//e.printStackTrace();
//5. 拒绝签收, 设置不重复队列
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag, true, false);
}
}
}
<!-- 定义监听器与队列的绑定 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<!--延迟队列效果实现: 一定要监听的是 死信队列!!!-->
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"/>
</rabbit:listener-container>
package com.lijw;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Aron.li
* @date 2022/3/4 23:41
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test01() {
while (true) {
}
}
}
@Test
public void testDelay() throws InterruptedException {
//1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2022年03月06日16:41:47");
//2.打印倒计时10秒
for (int i = 0; i < 10; i++) {
System.out.println("倒计数: " + i);
Thread.sleep(1000);
}
}
执行发送订单消息,倒计时10秒:
10秒结束后,消费者监听器接收到订单信息: