前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【消息中间件】异常和死信消息们的浪浪山

【消息中间件】异常和死信消息们的浪浪山

作者头像
用户10127530
发布2023-10-17 14:29:21
2430
发布2023-10-17 14:29:21
举报
文章被收录于专栏:半旧的技术栈半旧的技术栈

前 言 🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端 ☕专栏简介:深入、全面、系统的介绍消息中间件 🌰 文章简介:本文将介绍springboot整合rabbitmq,消息可靠性的保证和死信队列等知识

1.springboot整合RabbitMQ

1.1springboot整合生产者

新建项目rabbitmqdemo02,新建模块producer-springboot

在这里插入图片描述
在这里插入图片描述

修改改模块的pom.xml,引入依赖

代码语言:javascript
复制
 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

配置下application.yml

代码语言:javascript
复制
spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    virtual-host: /
    port: 5672

启动类com.wangzhou.ProducerApplication。

代码语言:javascript
复制
@SpringBootApplication
public class ProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

按如下结构新建配置类RabbitMQConfig

在这里插入图片描述
在这里插入图片描述

编写配置类。

代码语言:javascript
复制
@Configuration
public class RabbitMQConfig {
    public static final String QUEUE_NAME = "boot_queue";
    public static final String EXCHANGE_NAME = "boot_exchange";

    @Bean("bootExchange")
    public Exchange bootExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    @Bean("bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue")Queue queue, @Qualifier("bootExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }
}

编写测试类。

代码语言:javascript
复制
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    // 1.注入RabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.haha", "boot mq haha~~~~");
    }
}

运行,rabbitmq管控台就有我们创建的队列了。

在这里插入图片描述
在这里插入图片描述

点进去还可以看到具体的消息详情。

在这里插入图片描述
在这里插入图片描述

1.2 springboot整合消费者

步骤与生产者极其类似。

在这里插入图片描述
在这里插入图片描述

创建工程consumer-springboot

编写pom.xml。引入依赖。

代码语言:javascript
复制
 <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

编写yml配置

代码语言:javascript
复制
spring:
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    virtual-host: /
    port: 5672

新建主启动类。

代码语言:javascript
复制
@SpringBootApplication
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

新建监听类。

代码语言:javascript
复制
@Component
public class RabbitMQListener {
    @RabbitListener(queues = "boot_queue")
    public void ListenQueue(Message message) {
        System.out.println(message);
    }
}

启动类运行。完美!

在这里插入图片描述
在这里插入图片描述

小结下。

在这里插入图片描述
在这里插入图片描述

2.异常消息的浪浪山

2.1 消息可靠性问题

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

上面问题的答案是:发送时丢失(未到交换机或者到交换机未到队列),MQ丢失,消费者丢失。

针对这些可能性,我们将介绍如下高级特性。

在这里插入图片描述
在这里插入图片描述

基于这些问题,我们需要进一步学习MQ的一些高级特性。

2.2 生产者确认机制

在这里插入图片描述
在这里插入图片描述
2.2.1 初始化代码

新建一个工程,mq-advanced-demo。项目的架构如下图。

在这里插入图片描述
在这里插入图片描述
2.2.2 实现生产者确认
(1)生产者配置
在这里插入图片描述
在这里插入图片描述

完整代码。

代码语言:javascript
复制
logging:
  pattern:
    dateformat: HH:mm:ss:SSS
  level:
    cn.itcast: debug
spring:
  rabbitmq:
    host: localhost # rabbitMQ的ip地址
    port: 5672 # 端口
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
(2)实现ReturnCallback回调

当消息到交换器,但是路由过程中出现问题,通过ReturnCallback回调。

在这里插入图片描述
在这里插入图片描述

每个RabbitTemplate只能配置一个ReturnCallback,而RabbitTemplate是由spring容器创建的,是单例实例。因此ReturnCallback必须在全局进行配置,即在项目启动过程进行配置。

因此,CommonConfig实现了ApplicationContextAware接口。我们知道,Aware是通知接口,而ApplicationContext是一个bean容器,管理spring项目中的bean。因此,实现了ApplicationContextAware接口即意味着可以在项目启动所有bean(当然包括rabbitTemplate)加载以后调用回调,获取rabbitTemplate,设置全局的ReturnCallback。具体细节可以看接口的方法实现setApplicationContext。

代码如下。

代码语言:javascript
复制
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
}
(3)发送消息,实现ConfirmCallback回调

当消息甚至还没有到达交换机,通过ConfirmCallback来执行回调策略。这时不需要全局唯一的ConfrimCallback回调,可以每次发消息时指定不同的ConfirmCallback回调。因此代码放到单元测试类中即可。

在这里插入图片描述
在这里插入图片描述

代码如下。

代码语言:javascript
复制
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        // 1.准备消息
        String message = "hello, spring amqp!";
        // 2.准备CorrelationData
        // 2.1.消息ID
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 2.2.准备ConfirmCallback
        correlationData.getFuture().addCallback(result -> {
            // 判断结果
            if (result.isAck()) {
                // ACK
                log.debug("消息成功投递到交换机!消息ID: {}", correlationData.getId());
            } else {
                // NACK
                log.error("消息投递到交换机失败!消息ID:{}", correlationData.getId());
                // 重发消息
            }
        }, ex -> {
            // 记录日志
            log.error("消息发送失败!", ex);
            // 重发消息
        });
        // 3.发送消息
        rabbitTemplate.convertAndSend("amq.topic", "simple.test", message, correlationData);
    }
}

上面代码中的交换机和queue及二者的绑定,可以手动的在管控台创建,配置(如果有不需要)。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

跑以下这段测试代码。结果如下。

在这里插入图片描述
在这里插入图片描述

在管控台可以看到消息ready数为1。

在这里插入图片描述
在这里插入图片描述

下面演示下消息根本没有到达交换机,没有返回值的失败情况。将代码中交互机修改成一个不存在的,如aamp

在这里插入图片描述
在这里插入图片描述

2.3 消息持久化

在创建队列与交换机时可以设置是否持久化,这样不会因为宕机而丢失消息。在管控台上傻瓜式,选择Durable即可。

在这里插入图片描述
在这里插入图片描述

在代码里实现也特别简单,指定参数即可。我们这里不演示了,直接把代码截图贴给大家。

在这里插入图片描述
在这里插入图片描述

另外,在spring中队列、交换机和消息默认情况下其实都是持久的哦。

2.4 消费者消息确认

经过生产者消息确认机制和消息持久化,消息一定可以投递到消费者,但是是否消息一定可以被消费还不一定,如果投递时,消费者死了。那就GG了。

因此还需要消费者消息确认机制。

在这里插入图片描述
在这里插入图片描述

先将配置中acknowledge-mode设置成none测试下。

代码语言:javascript
复制
logging:
  pattern:
    dateformat: HH:mm:ss:SSS
  level:
    cn.itcast: debug
spring:
  rabbitmq:
    host: localhost # rabbitMQ的ip地址
    port: 5672 # 端口
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: none

编写下监听类,在代码中模拟下处理过程出现异常的情形。

代码语言:javascript
复制
@Slf4j
@Component
public class SpringRabbitListener {

   @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        log.debug("消费者接收到simple.queue的消息:【" + msg + "】");
        System.out.println(1 / 0);
        log.info("消费者处理消息成功!");
    }
}

如下图,打一个断点。

在这里插入图片描述
在这里插入图片描述

在管控台确定下队列的情况,可以看到现在有一条消息。

在这里插入图片描述
在这里插入图片描述

debug消费者。

在这里插入图片描述
在这里插入图片描述

执行到断点处,查看管控台,队列已经没有消息了。

在这里插入图片描述
在这里插入图片描述

这说明消息已经投递到消费者进行消费了。接着走,消费者就出异常了,消息丢失。

接着来,将 acknowledge-mode:设置为auto。使用生产者发送一条新消息,再用消费者debug。

管控台如下所示,发现unacked字段是1,说明此时消息已经被消费者获取,但是还没有返回值ack。

在这里插入图片描述
在这里插入图片描述

如果放开断点直接跑,消费者会一直刷新获取消息。消息会一直重新尝试投递。

这样的方式比直接丢消息要好一点,但是捏,也不完美,如果消费者代码本身没有问题,消费者会最终将消息消费,如果代码本身有问题,就一直跑着。后面会学习更加升级的做法。

2.5 失败重试机制

上述问题,可以设置重试的上限。设置很简单,在消费者的配置文件里配配就好。

在这里插入图片描述
在这里插入图片描述

读者请自测。

2.6 消费者失败消息处理策略

上面的策略有一个问题,重试多次以后消息就丢了,普通消息无所谓,重要消息那就难受了。

实际上,可以指定消费者失败消息处理策略。

在这里插入图片描述
在这里插入图片描述

第三种策略显然是最完整的,生产中很推荐。其具体做法参考下图。

在这里插入图片描述
在这里插入图片描述

做一下,编写ErrorMessageConfig

代码语言:javascript
复制
@Configuration
public class ErrorMessageConfig {

    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }

    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }

    @Bean
    public Binding errorMessageBinding(){
        return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }

    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

测试下,重新跑下消费者。

在管控台可以检测下queue,exchange及其绑定关系。

在这里插入图片描述
在这里插入图片描述

直接通过管控台给simple.queue发送下消息。

在这里插入图片描述
在这里插入图片描述

效果是这样的。

在这里插入图片描述
在这里插入图片描述

在管控台的error.queue中可以看到消息,甚至可以看到具体的异常栈信息!牛啊!

在这里插入图片描述
在这里插入图片描述

总结下。

在这里插入图片描述
在这里插入图片描述

3.死信消息的浪浪山

3.1 ttl

在这里插入图片描述
在这里插入图片描述

举一个栗子,订单超时未支付则自动取消。

3.1.1 设置队列TTL

下面用代码实现下第一种方式吧。

生产者模块新增配置类TTLRabbitConfiguration.

代码语言:javascript
复制
@Configuration
public class TTLRabbitConfiguration {
    public static final String QUEUE_NAME = "ttl_queue_test";
    public static final String EXCHANGE_NAME = "ttl_exchange_test";

    @Bean("ttlExchange")
    public Exchange ttlExchange() {
        return ExchangeBuilder.directExchange(EXCHANGE_NAME).durable(true).build();
    }

    @Bean("ttlQueue")
    public Queue ttlQueue() {
        Map<String, Object> args = new HashMap<>();
        // 设置5s的过期时间
        args.put("x-message-ttl", 5000);
        return QueueBuilder.durable(QUEUE_NAME).withArguments(args).build();
    }

  @Bean
    public Binding bindTTLQueueExchange(@Qualifier("ttlQueue")Queue queue, @Qualifier("ttlExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();
    }

测试类新增方法。

代码语言:javascript
复制
 @Test
    public void testTTLSend() {
        rabbitTemplate.convertAndSend(TTLRabbitConfiguration.EXCHANGE_NAME, "ttl", "ttl mq haha~~~~");
    }

启动运行,可以看到下图中ttl_queue_test会有标识TTL,有1条消息ready。

在这里插入图片描述
在这里插入图片描述

5s以后,ready的消息数会变成0条。

3.1.2 设置消息TTL

配置类

代码语言:javascript
复制
@Configuration
public class MSGTTLRabbitConfig {
    public static final String QUEUE_NAME = "msg_queue";
    public static final String EXCHANGE_NAME = "msg_exchange";

    @Bean("msgExchange")
    public Exchange msgExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    @Bean("msgQueue")
    public Queue msgQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    @Bean
    public Binding bindMSGQueueExchange(@Qualifier("msgQueue")Queue queue, @Qualifier("msgExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("msg_ttl").noargs();
    }
}

测试方法。

代码语言:javascript
复制
   @Test
    public void testMsgTTLSend() {
        MessagePostProcessor postProcessor= new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(MSGTTLRabbitConfig.EXCHANGE_NAME, "msg_ttl", "msg_ttl mq haha~~~~",postProcessor);
    }

读者可以自测。

3.2 死信交换机

如果ttl到达,直接将消息删除,消息永久就消失了。实际上业务往往不会真的删除,而是将过期队列中过期的消息移入死信交换机。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

注意与前面所学的消息失败的异常交换机进行对比。可以发现,异常消息是消费者将其投递到异常队列,而死信消费者可不会管事哦。

在这里插入图片描述
在这里插入图片描述

死信交换机当然也可以做异常兜底,但是他还有其它的应用场景。建议异常兜底方案还是使用异常交换机来搞。

由于死信消息会直接由普通队列投递到死信队列,而不是通过consumer,因此,需要在投递时指定死信交换机和对应的路由key。

在这里插入图片描述
在这里插入图片描述

总结下。

在这里插入图片描述
在这里插入图片描述

3.3 延迟队列

在这里插入图片描述
在这里插入图片描述

手工去实现延迟队列多少有点繁琐,可以使用官方插件来快速做。

在这里插入图片描述
在这里插入图片描述

下面来安装下延迟队列插件。

官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

下面我们会讲解基于Docker来安装RabbitMQ插件,如果您是通过其它方式安装的RabbitMQ,可以选择使用docker再装下或者自己查找对应的插件安装方式。

3.3.1下载插件

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

其中包含各种各样的插件,包括我们要使用的DelayExchange插件:

在这里插入图片描述
在这里插入图片描述

大家可以去对应的GitHub页面下载3.8.9版本的插件,地址为https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9这个对应RabbitMQ的3.8.5以上版本。

3.3.2 上传插件

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

我们之前设定的RabbitMQ的数据卷名称为mq-plugins,所以我们使用下面命令查看数据卷:

代码语言:javascript
复制
docker volume inspect mq-plugins

可以得到下面结果:

在这里插入图片描述
在这里插入图片描述

接下来,将插件上传到这个目录即可:

在这里插入图片描述
在这里插入图片描述
3.3.3 安装插件

最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq,所以执行下面命令:

代码语言:javascript
复制
docker exec -it mq bash

执行时,请将其中的 -it 后面的mq替换为你自己的容器名.

进入容器内部后,执行下面命令开启插件:

代码语言:javascript
复制
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

结果如下:

在这里插入图片描述
在这里插入图片描述
3.3.4 使用插件

在管控台。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

或者也可以在代码中做上面同样的工作。

声明下死信交换机。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

代码贴下。

代码语言:javascript
复制
 @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "delay.queue", durable = "true"),
            exchange = @Exchange(name = "delay.direct", delayed = "true"),
            key = "delay"
    ))
    public void listenDelayExchange(String msg) {
        log.info("消费者接收到了delay.queue的延迟消息");
    }

发消息。

在这里插入图片描述
在这里插入图片描述

代码如下。

代码语言:javascript
复制
   @Test
    public void testSendDelayMessage() throws InterruptedException {
        // 1.准备消息
        Message message = MessageBuilder
                .withBody("hello, ttl messsage".getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .setHeader("x-delay", 5000)
                .build();
        // 2.准备CorrelationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 3.发送消息
        rabbitTemplate.convertAndSend("delay.direct", "delay", message, correlationData);

        log.info("发送消息成功");
    }

跑一下,会发现一个问题

在这里插入图片描述
在这里插入图片描述

实际上消息只是延迟了,但是异常队列处理了它。因此我们需要对之前的异常策略进行下增强。将生产者的config进行下增强,判断下是否是延迟消息。

代码语言:javascript
复制
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate对象
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            Integer receivedDelay = message.getMessageProperties().getReceivedDelay();
            if (receivedDelay != null && receivedDelay > 0) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            // 记录日志
            log.error("消息发送到队列失败,响应码:{}, 失败原因:{}, 交换机: {}, 路由key:{}, 消息: {}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有需要的话,重发消息
        });
    }
}

总结下。

在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-01-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.springboot整合RabbitMQ
    • 1.1springboot整合生产者
      • 1.2 springboot整合消费者
      • 2.异常消息的浪浪山
        • 2.1 消息可靠性问题
          • 2.2 生产者确认机制
            • 2.2.1 初始化代码
            • 2.2.2 实现生产者确认
          • 2.3 消息持久化
            • 2.4 消费者消息确认
              • 2.5 失败重试机制
                • 2.6 消费者失败消息处理策略
                • 3.死信消息的浪浪山
                  • 3.1 ttl
                    • 3.1.1 设置队列TTL
                    • 3.1.2 设置消息TTL
                  • 3.2 死信交换机
                    • 3.3 延迟队列
                      • 3.3.1下载插件
                      • 3.3.2 上传插件
                      • 3.3.3 安装插件
                      • 3.3.4 使用插件
                  相关产品与服务
                  容器服务
                  腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档