首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ死信队列在SpringBoot中的使用

RabbitMQ死信队列在SpringBoot中的使用

作者头像
喜欢天文的pony站长
发布2020-06-29 12:16:31
1.1K0
发布2020-06-29 12:16:31
举报
文章被收录于专栏:RabbitMQ实战RabbitMQ实战

死信队列可以实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会被丢弃。

# 概念:

  • 消息会变成死信消息的场景:
    1. 消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消费者拒绝签收,并且重新入队为false。1.1 有一种场景需要注意下:消费者设置了自动ACK,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是内部的原理还是调用了nack/reject
    2. 消息过期,过了TTL存活时间。
    3. 队列设置了x-max-length最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。
  • 代码编写流程是:
    1. 有一个(n个)正常业务的Exchange,比如为user-exchange
    2. 有一个(n个)正常业务的Queue,比如为user-queue。(因为该队列需要绑定死信交换机,所以需要加俩参数:死信交换机:x-dead-letter-exchange,死信消息路由键:x-dead-letter-routing-key
    3. 进行正常业务的交换机和队列绑定。
    4. 定义一个死信交换机,比如为common-dead-letter-exchange
    5. 将正常业务的队列绑定到死信交换机(队列设置了x-dead-letter-exchange即会自动绑定)。
    6. 定义死信队列user-dead-letter-queue用于接收死信消息,绑定死信交换机。
  • 业务流程是:
    1. 正常业务消息被投递到正常业务的Exchange,该Exchange根据路由键将消息路由到绑定的正常队列。
    2. 正常业务队列中的消息变成了死信消息之后,会被自动投递到该队列绑定的死信交换机上(并带上配置的路由键,如果没有指定死信消息的路由键,则默认继承该消息在正常业务时设定的路由键)。
    3. 死信交换机收到消息后,将消息根据路由规则路由到指定的死信队列。
    4. 消息到达死信队列后,可监听该死信队列,处理死信消息。
  • 死信交换机死信队列也是普通的交换机和队列,只不过是我们人为的将某个交换机和队列来处理死信消息。
  • 流程图

图片.png

# 代码实现

  1. 配置
spring:
  application:
    name: learn-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: futao
    password: 123456789
    virtual-host: deadletter-vh
    connection-timeout: 15000
    # 发送确认
    publisher-confirms: true
    # 路由失败回调
    publisher-returns: true
    template:
      # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃
      mandatory: true
    listener:
      simple:
        # 每次从RabbitMQ获取的消息数量
        prefetch: 1
        default-requeue-rejected: false
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        # 签收模式为手动签收-那么需要在代码中手动ACK
        acknowledge-mode: manual

app:
  rabbitmq:
    # 队列定义
    queue:
      # 正常业务队列
      user: user-queue
      # 死信队列
      user-dead-letter: user-dead-letter-queue
    # 交换机定义
    exchange:
      # 正常业务交换机
      user: user-exchange
      # 死信交换机
      common-dead-letter: common-dead-letter-exchange
  1. 队列、交换机定义与绑定。
/**
 * 队列与交换机定义与绑定
 *
 * @author futao
 * @date 2020/4/7.
 */
@Configuration
public class Declare {

        /**
     * 用户队列
     *
     * @param userQueueName 用户队列名
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //声明该队列死信消息在交换机的 路由键
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                .build();
    }

    /**
     * 用户交换机
     *
     * @param userExchangeName 用户交换机名
     * @return
     */
    @Bean
    public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) {
        return ExchangeBuilder
                .topicExchange(userExchangeName)
                .durable(true)
                .build();
    }

    /**
     * 用户队列与交换机绑定
     *
     * @param userQueue    用户队列名
     * @param userExchange 用户交换机名
     * @return
     */
    @Bean
    public Binding userBinding(Queue userQueue, Exchange userExchange) {
        return BindingBuilder
                .bind(userQueue)
                .to(userExchange)
                .with("user.*")
                .noargs();
    }

    /**
     * 死信交换机
     *
     * @param commonDeadLetterExchange 通用死信交换机名
     * @return
     */
    @Bean
    public Exchange commonDeadLetterExchange(@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return ExchangeBuilder
                .topicExchange(commonDeadLetterExchange)
                .durable(true)
                .build();
    }


   /**
     * 用户队列的死信消息 路由的队列
     * 用户队列user-queue的死信投递到死信交换机`common-dead-letter-exchange`后再投递到该队列
     * 用这个队列来接收user-queue的死信消息
     *
     * @return
     */
    @Bean
    public Queue userDeadLetterQueue(@Value("${app.rabbitmq.queue.user-dead-letter}") String userDeadLetterQueue) {
        return QueueBuilder
                .durable(userDeadLetterQueue)
                .build();
    }

    /**
     * 死信队列绑定死信交换机
     *
     * @param userDeadLetterQueue      user-queue对应的死信队列
     * @param commonDeadLetterExchange 通用死信交换机
     * @return
     */
    @Bean
    public Binding userDeadLetterBinding(Queue userDeadLetterQueue, Exchange commonDeadLetterExchange) {
        return BindingBuilder
                .bind(userDeadLetterQueue)
                .to(commonDeadLetterExchange)
                .with("user-dead-letter-routing-key")
                .noargs();
    }

}
  • 定义好之后启动程序,springboot会读取Spring容器中类型为Queue和Exchange的bean进行队列和交换机的初始化与绑定。当然也可以自己在RabbitMQ的管理后台进行手动创建与绑定。
  • 查看管理后台

交换机

队列

队列与死信交换机

# 测试

  • 消息生产者
/**
 * @author futao
 * @date 2020/4/7.
 */
@Component
public class DeadLetterSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${app.rabbitmq.exchange.user}")
    private String userExchange;

    public void send() {
        User user = User.builder()
                .userName("天文")
                .address("浙江杭州")
                .birthday(LocalDate.now(ZoneOffset.ofHours(8)))
                .build();
        rabbitTemplate.convertAndSend(userExchange, "user.abc", user);
    }
}

1. 场景1.1

消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消费者拒绝或者nack,并且重新入队为false。

nack()与reject()的区别是:reject()不支持批量拒绝,而nack()可以.

  • 消费者代码
/**
 * @author futao
 * @date 2020/4/9.
 */
@Slf4j
@Component
public class Consumer {

    /**
     * 正常用户队列消息监听消费者
     *
     * @param user
     * @param message
     * @param channel
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user}")
    public void userConsumer(User user, Message message, Channel channel) {
        log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user));
        try {
            //参数为:消息的DeliveryTag,是否批量拒绝,是否重新入队
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            log.info("拒绝签收...消息的路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey());
        } catch (IOException e) {
            log.error("消息拒绝签收失败", e);
        }
    }

    /**
     * @param user
     * @param message
     * @param channel
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
    public void userDeadLetterConsumer(User user, Message message, Channel channel) {
        log.info("接收到死信消息:[{}]", JSON.toJSONString(user));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("死信队列签收消息....消息路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey());
        } catch (IOException e) {
            log.error("死信队列消息签收失败", e);
        }
    }
}
  • 可以看到,正常消息被NACK之后最后到了死信队列,且路由键发生了变化。

死信消息

1. 场景1.2

消费者设置了自动签收,当重复投递次数达到了设置的最大retry次数之后,消息也会投递到死信队列,但是内部的原理还是调用了nack/reject

  • application.yml中需要更改一些配置
spring:
  application:
    name: learn-rabbitmq
  rabbitmq:
    listener:
      simple:
        # 每次从RabbitMQ获取的消息数量
        prefetch: 1
        default-requeue-rejected: false
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        # 自动签收
        acknowledge-mode: auto
        retry:
          enabled: true
          # 第一次尝试时间间隔
          initial-interval: 10S
          # 两次尝试之间的最长持续时间。
          max-interval: 10S
          # 最大重试次数(=第一次正常投递1+重试次数4)
          max-attempts: 5
          # 上一次重试时间的乘数
          multiplier: 1.0
  • 消费者代码
/**
 * @author futao
 * @date 2020/4/9.
 */
@Slf4j
@Configuration
public class AutoAckConsumer {

    /**
     * 正常用户队列消息监听消费者
     *
     * @param user
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user}")
    public void userConsumer(User user) {
        log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user));
        throw new RuntimeException("模拟发生异常");
    }

    /**
     * @param user
     */
    @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}")
    public void userDeadLetterConsumer(User user) {
        log.info("接收到死信消息并自动签收:[{}]", JSON.toJSONString(user));
    }
}
  • 测试结果:

image.png

image.png

  • 从测试结果可以看出,消息如果未被正常消费,则进行重试,如果最终还未被正常消费,则会被投递到死信队列。

initial-interval,max-interval这两个参数啥作用不知道,现在测试的结果是一直都会取最短的那个时间作为下次投递时间...

2. 测试场景 2

消息过期,过了TTL存活时间。

  • 需要修改队列定义,设置队列消息的过期时间x-message-ttl.
    /**
     * 用户队列
     *
     * @param userQueueName 用户队列名
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //声明该队列死信消息在交换机的 路由键
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                //该队列的消息的过期时间-超过这个时间还未被消费则路由到死信队列
                .withArgument("x-message-ttl", 5000)
                .build();
    }
  • user-queue的消费者注释,使消息无法被消费,直到消息在队列中的时间达到设定的存活时间。

ttl

  • 根据日志可以看到,消息在5S后会被投递到死信队列。

image.png

  • 注意:可以给队列设置消息过期时间,那么所有投递到这个队列的消息都自动具有这个属性。还可以在消息投递之前,给每条消息设定指定的过期时间。(当两者都设置了,则默认取较短的值)

下面测试给每条消息设置指定的过期时间:

  • 修改消息生产者:
/**
 * @author futao
 * @date 2020/4/7.
 */
@Slf4j
@Component
public class DeadLetterSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Value("${app.rabbitmq.exchange.user}")
    private String userExchange;

    public void send(String exp) {
        User user = User.builder()
                .userName("天文")
                .address("浙江杭州")
                .birthday(LocalDate.now(ZoneOffset.ofHours(8)))
                .build();
        log.info("消息投递...指定的存活时长为:[{}]ms", exp);
        rabbitTemplate.convertAndSend(userExchange, "user.abc", user, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties messageProperties = message.getMessageProperties();
                //为每条消息设定过期时间
                messageProperties.setExpiration(exp);
                return message;
            }
        });
    }
}

image.png

  • 从测试结果可以看出,每条消息都在指定的时间投递到了死信队列。

【坑】重点注意!!!:RabbitMQ对于消息过期的检测:只会检测最近将要被消费的那条消息是否到达了过期时间,不会检测非末端消息是否过期。造成的问题是:非末端消息已经过期了,但是因为末端消息还未过期,非末端消息处于阻塞状态,所以非末端消息不会被检测到已经过期。使业务产生与预期严重不一致的结果。

  • 对上面的问题进行测试:(第一条消息的过期时间设置成10S,第二条消息设置成5S)

image.png

  • 从测试结果可以看出,id为1的消息存活时长为10S,id为2的消息存活时间为5S。但是只有当第一条消息(id=1)过期之后,id=2的消息到达队列末端,才会被检测到已经过期。

3. 测试场景3

队列设置了x-max-length最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。

  • 修改队列定义
  /**
     * 用户队列
     *
     * @param userQueueName 用户队列名
     * @return
     */
    @Bean
    public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName,
                           @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) {
        return QueueBuilder
                .durable(userQueueName)
                //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置)
                .withArgument("x-dead-letter-exchange", commonDeadLetterExchange)
                //声明该队列死信消息在交换机的 路由键
                .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key")
                //队列最大消息数量
                .withArgument("x-max-length", 2)
                .build();
    }

image.png

  • 向队列中投递消息

image.png

  • 从结果可以看出,当投递第3条消息的时候,RabbitMQ会把在最靠经被消费那一端的消息移出队列,并投递到死信队列。

image.png

队列中将始终保持最多两个消息。

# 其他:

  • Queue的可配置项可在RabbitMQ的管理后台查看:

image.png

  • 源码:https://github.com/FutaoSmile/springboot-learn-integration/tree/master/springboot-learn-rabbitmq

# 相关:

SpringBoot RabbitMQ实现消息可靠投递

# TODO:

  • 消费端限流保护
  • 延迟队列
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 喜欢天文 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • # 概念:
  • # 代码实现
  • # 测试
  • # 其他:
  • # 相关:
  • # TODO:
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档