死信(dead message
)简单理解就是因为种种原因,无法被消费的信息,就是死信
有死信,自然就有死信队列。当消息在一个队列中变成死信之后,他能被重新发送到另一个交换器中,这个交换器就是 DLX
(Dead Letter Exchange
),绑定 DLX
的队列,就称为死信对类(Dead Letter Queue
,简称 DLQ
)
消息变成死信一般是由于以下几种情况:
Basic.Reject/Basic.Nack
),并设置 requeue
参数为 false
包含两部分:
死信交换机和死信队列和普通的交换机,队列没有区别
// 1. 正常部分
// 正常交换机
@Bean("normalExchange")
public Exchange normalExchange() {
return ExchangeBuilder.topicExchange(Constant.NORMAL_EXCHANGE_NAME).durable(true).build();
}
// 正常队列
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(Constant.NORMAL_QUEUE).build();
}
// 正常队列和交换机绑定
@Bean("normalBinding")
public Binding normalBinding(@Qualifier("normalExchange") Exchange exchange, @Qualifier("normalQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
}
// 2. 死信部分
// 死信交换机
@Bean("dlxExchange")
public Exchange dlxExchange() {
return ExchangeBuilder.topicExchange(Constant.DLX_EXCHANGE_NAME).durable(true).build();
}
// 死信队列
@Bean("dlxQueue")
public Queue ndlxQueue() {
return QueueBuilder.durable(Constant.DLX_QUEUE).build();
}
// 死信队列和交换机绑定
@Bean("dlxBinding")
public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("dlx").noargs();
}
当这个队列中存在死信时,RabbitMQ
会自动地把这个消息发布到设置的 DLX
上,进而被路由到另一个队列,即死信队列
@Bean("normalQueue")
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 绑定死信队列
arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);
// 设置发送给死信队列的 RoutingKey arguments.put("x-dead-letter-routing-key", "dlx");
return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
}
简写为:
@Bean("normalQueue")
public Queue normalQueue() {
return QueueBuilder.durable(Constant.NORMAL_QUEUE)
.deadLetterExchange(Constant.DLX_EXCHANGE_NAME)
.deadLetterRoutingKey("dlx").build();
}
@Bean("normalQueue")
public Queue normalQueue() {
Map<String, Object> arguments = new HashMap<>();
// 1. 绑定死信队列
arguments.put("x-dead-letter-exchange", Constant.DLX_EXCHANGE_NAME);
// 设置发送给死信队列的 RoutingKey arguments.put("x-dead-letter-routing-key", "dlx");
// 2. 制造死信产生的条件
arguments.put("x-message-ttl", 10000); // 10 秒过期
arguments.put("x-max-length", 10); // 队列长度
return QueueBuilder.durable(Constant.NORMAL_QUEUE).withArguments(arguments).build();
}
简写为:
return QueueBuilder.durable(Constant.NORMAL_QUEUE)
.deadLetterExchange(Constant.DLX_EXCHANGE_NAME)
.deadLetterRoutingKey("dlx")
.ttl(10*1000)
.maxLength(10L)
.build();
@RequestMapping("/dlx")
public void dlx() {
// 测试过期时间,当时间到达 TTL,消息自动进入到死信队列
rabbitTemplate.convertAndSend(Constant.DLX_EXCHANGE_NAME, "normal", "dlx test...");
// 测试队列长度
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(Constant.DLX_EXCHANGE_NAME, "normal", "dlx test...");
}
// 测试消息拒收
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
}
程序启动之后,观察队列
队列 Features
说明:
D
:durable
的缩写,设置持久化TTL
:Time to Live
,队列设置了 TTL
Lim
:队列设置了长度(x-max-length
)DLX
:队列设置了死信交换机(x-dead-letter-exchange
)DLK
:队列设置了死信 RoutingKey
(x-dead-letter-routing-key
)测试过期时间,到达过期时间之后,进入死信队列
调用接口,发送消息:
发送之后:
10 秒后,消息进入到死信队列
生产者首先发送一条消息,然后经过见换气(normal_exchange
)顺利地存储到队列(normal_queue
)中
normal_queue
设置了过期时间为 10 秒,在这 10s
内没有消费者消费这条消息,那么判定这条消息过期DLX
,过期之后,消息会被丢给交换器(dlx_exchange
)中,这时根据 RoutingKey
匹配,找到匹配的队列(dlx_queue
),最后消息被存储在 queue.dlx
这个死信队列中测试达到队列长度,消息进入死信队列
队列长度设置为 10,我们发送 20 条数据,会有 10 条数据直接进入到死信队列
发送 20 条消息
// 测试队列长度
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(Constant.NORMAL_EXCHANGE_NAME, "normal", "dlx test...");
}
运行后:
过期之后,正常队列的 10 条消息也会进入到死信队列
测试消息拒收
写消费者代码,并强制异常,测试拒绝签收
@Component
public class DlxQueueListener {
// 指定监听队列的名称
@RabbitListener(queues = Constant.NORMAL_QUEUE)
public void ListenerQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
// 模拟处理失败
int num = 3 / 0;
System.out.println("处理完成");
// 3. 手动签收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 4. 异常了就拒绝签收
Thread.sleep(1000);
// 第三个参数 requeue,是否重新发送。true,会重新发送;false,不会重新发送
channel.basicNack(deliveryTag, true, false);
}
}
// 指定监听队列的名称
@RabbitListener(queues = Constant.DLX_QUEUE)
public void ListenerDLXQueue(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.printf("接收到消息:%s, deliveryTag: %d%n", new String(message.getBody(), "UTF-8"),
message.getMessageProperties().getDeliveryTag());
}
}
发送消息
#高频面试
死信队列作为 RabbitMQ
的高级特征,也是面试的一大重点
死信(Dead Letter
)是消息队列中的一种特殊消息,它指的是那些无法正常消费或处理的消息。
在消息队列系统中,如 RabbitMQ
,死信队列用于存储这些死信信息
TTL
requeue=false
),消息也会成为死信对于 RabbitMQ
来说,死信队列是一个非常有用的特性。
它可以处理异常情况下,消息不能够被消费者正确消费而被置入死信队列中的情况,应用程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统
场景的应用还有: