死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况:
队列绑定死信交换机:
给队列设置参数:x-dead-letter-exchange 和 x-dead-letter-routing-key
下面我们编写生产者、消费者,根据以下三种情况进行演示。
消息成为死信的三种情况:
<!--
死信队列:
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
3. 正常队列绑定死信交换机
设置两个参数:
* x-dead-letter-exchange:死信交换机名称
* x-dead-letter-routing-key:发送给死信交换机的routingkey
-->
<!--
1. 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)
-->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!--3. 正常队列绑定死信交换机-->
<rabbit:queue-arguments>
<!--3.1 x-dead-letter-exchange:死信交换机名称-->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey, value只要符合死信交换机的routingkey规则-->
<entry key="x-dead-letter-routing-key" value="dlx.hehe" />
<!--4.1 设置队列的过期时间 ttl-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<!--4.2 设置队列的长度限制 max-length -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--
2. 声明死信队列(queue_dlx)和死信交换机(exchange_dlx)
-->
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDLX(){
//1. 测试过期时间:当消息在正常队列 test_queue_dlx 时间过期未被消费 --自动进入--> 死信队列 queue_dlx
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hahahah", "这将会成为一条死信消息嘛?");
}
执行之后,从控制面板可以看到消息已经进入了正常队列:
等待过期时间过后,消息未消费则自动进入死信队列:
从死信队列中查看消息,确认消息内容:
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDLX(){
//2. 测试长度限制:当消息超出正常队列的最大长度10,那么后续的消息则自动转入 --> 死信队列 queue_dlx
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hahahah", "这将会成为一条死信消息嘛?");
}
}
执行之后,查询管理页面可以发现,正常队列只有10条消息,超出正常队列长度限制的10条消息自动进入死信队列:
等待正常队列的消息过期后,全部进入死信队列:
package com.lijw.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* @author Aron.li
* @date 2022/3/4 23:36
*/
@Component
public class DlxListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//1. 获取传递的标签,用于消息确认
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//2. 接收消息
System.out.println(new String(message.getBody()));
//3. 处理业务逻辑
System.out.println("处理业务逻辑...");
int i = 3 / 0; // 产生异常
//4. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//5. 拒绝签收, 设置不重复队列
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag, true, false);
}
}
}
<!-- 定义监听器与队列的绑定 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1">
<rabbit:listener ref="dlxListener" queue-names="test_queue_dlx"/>
</rabbit:listener-container>
package com.lijw;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Aron.li
* @date 2022/3/4 23:41
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-consumer.xml")
public class ConsumerTest {
@Test
public void test01() {
while (true) {
}
}
}
/**
* 发送测试死信消息:
* 1. 过期时间
* 2. 长度限制
* 3. 消息拒收
*/
@Test
public void testDLX(){
//3. 测试消息被消费者拒收:自动进入 --> 死信队列 queue_dlx
rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.hahahah", "这将会成为一条死信消息嘛?");
}
执行发送消息之后,消费者监听器获取到消息如下:
此时消费者拒绝签收消息,消息直接进入死信队列: