前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ消息的可靠性投递

RabbitMQ消息的可靠性投递

原创
作者头像
会洗碗的CV工程师
发布2024-05-01 10:10:03
2170
发布2024-05-01 10:10:03
举报
文章被收录于专栏:消息中间件

一、概念

RabbitMQ消息投递的路径为:

生产者 ---> 交换机 ---> 队列 ---> 消费者

在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?

RabbitMQ的消息可靠性投递是确保消息在生产、传输和消费过程中能够准确、完整地到达目的地的重要机制。以下是关于RabbitMQ消息可靠性投递的一些关键概念和方法:

  1. 消息的确认机制:
  • 自动确认模式(Auto Acknowledgment):在这种模式下,当消费者接收到消息后,RabbitMQ会自动将消息标记为已确认,并从队列中删除。然而,这种模式下,消息一旦被投递给消费者,就会被认为已经被成功处理,无论消费者是否真正处理了该消息。
  • 手动确认模式(Manual Acknowledgment):在这种模式下,消费者需要在处理完消息后,显式地向RabbitMQ发送一个确认回执。这样,RabbitMQ才会将消息从队列中删除。手动确认模式确保了消息的可靠处理,即使消费者处理过程中发生异常,消息也不会丢失。
  1. 消息的持久化:
  • 队列的持久化:在声明队列时,可以指定队列是否持久化。持久化的队列在RabbitMQ重启后仍然存在,并且其中的消息也不会丢失。
  • 消息的持久化:在发布消息时,可以将其标记为持久化。这样,即使RabbitMQ重启或发生故障,消息也不会丢失。
  1. 重试机制:
  • 自动重试:在消费者端,可以通过使用basic.recover()方法进行消息的自动重试。当该方法被调用时,RabbitMQ将重新投递消息,直到投递成功或者消息被拒绝。
  • 延迟队列方式:RabbitMQ还支持通过使用延迟队列(dead-letter queue)实现消息的重试。在这种方式中,当消息一次投递失败后,消息将被重新投递到延迟队列中。延迟队列的作用是将消息保留一段时间,然后再将其重新投递到原队列中进行处理。
  1. confirm机制和return机制:
  • confirm机制:用于确保消息从生产者到交换机的过程中被正确处理。如果消息未能成功到达交换机,生产者将收到确认失败的通知,并可以选择重新发送消息。
  • return机制:用于确保消息从交换机到队列的过程中被正确处理。如果消息在路由过程中出现问题(如找不到匹配的队列),RabbitMQ将向生产者发送一个return通知,其中包含有关失败原因的信息。生产者可以根据这些信息选择重新发送消息或执行其他操作。

综上所述,RabbitMQ通过提供消息的确认机制、持久化、重试机制以及confirm和return机制等功能来确保消息的可靠性投递。这些机制共同协作,使得RabbitMQ成为一个高效、稳定且可靠的消息代理软件。接下来详细说明上面这些保证消息投递的可靠性机制:

确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。

退回模式(return)可以监听消息是否从交换机成功传递到队列。

消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:

代码语言:yml
复制
spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: guest
    password: guest
    virtual-host: /

在生产者的配置类创建交换机和队列

代码语言:java
复制
@Configuration
public class RabbitMQConfig {

    private final String EXCHANGE_NAME = "my_topic_exchange";
    private final String QUEUE_NAME = "my_queue";

    // 创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange getExchange(){
        return ExchangeBuilder
                // 交换机类型
                .topicExchange(EXCHANGE_NAME)
                // 是否持久化
                .durable(true)
                .build();

    }

    // 创建队列
    @Bean(QUEUE_NAME)
    public Queue getMessageQueue(){
        return new Queue(QUEUE_NAME);
    }

    // 交换机绑定队列
    @Bean
    public Binding bindingMessageQueue(@Qualifier(EXCHANGE_NAME) Exchange exchange,
                                       @Qualifier(QUEUE_NAME) Queue queue){
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with("my_routing")
                .noargs();

    }
}

二、确认模式

确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下:

生产者配置文件开启确认模式

代码语言:javascript
复制
spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 开启确认模式
    publisher-confirm-type: correlated

生产者定义测试确认模式的回调方法

代码语言:javascript
复制
@Test
public void testConfirm(){
    // 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        /**
         * 被调用的回调方法
         * @param correlationData 相关配置信息
         * @param ack 交换机是否成功收到了消息
         * @param cause 失败原因
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack){
                System.out.println("Confirm接收成功");
            }
            else {
                System.out.println("Confirm接收失败,原因为: "+cause);
            }
        }
    });
    /**
     * 发送消息
     * 参数1:交换机
     * 参数2:路由key
     * 参数3:要发送消息
     */
    rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","最后发了一个狗头和撤回了一条消息,我没回");
}

运行之后,控制台应该打印出相关的测试信息:如下图:

并且,可以看到管控台也是出现了相关的交换机和队列消息:

三、退回模式

退回模式(return)可以监听消息是否从交换机成功传递到队列,

使用方法如下:

生产者配置文件开启退回模式

代码语言:javascript
复制
spring:
  rabbitmq:
    host: 192.168.66.100
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 开启确认模式
    publisher-confirm-type: correlated
    # 开启回退模式
    publisher-returns: true

生产者定义退回模式的回调方法,如何让他发送失败回调方法呢,很简单,只需要放一个不存在的路由键即可,代码如下:

代码语言:javascript
复制
@Test
public void testReturn() {
    // 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        /**
         * @param returnedMessage 失败后将失败信息封装到改参数中
         */
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            System.out.println("消息对象: "+returnedMessage.getMessage());
            System.out.println("错误码: "+returnedMessage.getReplyCode());
            System.out.println("错误信息: "+returnedMessage.getReplyText());
            System.out.println("交换机: "+returnedMessage.getExchange());
            System.out.println("路由键: "+returnedMessage.getRoutingKey());
        }
    });
    /**
     * 发送消息
     * 参数1:交换机
     * 参数2:路由key
     * 参数3:要发送消息
     */
    rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","到今天也没有给我发消息");
}

执行后如下图:

如果是已经存在的路由键,则不会执行改回调方法:如下图:

可以看到什么都没有

四、Ack

在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

自动确认:spring.rabbitmq.listener.simple.acknowledge="none"

手动确认:spring.rabbitmq.listener.simple.acknowledge="manual"

消费者配置开启手动签收

消费者处理消息时定义手动签收的情况,代码如下:

代码语言:javascript
复制
@Component
public class AckConsumer {
    @RabbitListener(queues = "my_queue")
    public void listenMessage(Message message, Channel channel) throws InterruptedException, IOException {
        // 消息投递序号,消息每次投递该值都会+1
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("成功接收到消息: "+message);
            /**
             * 参数1:消息投递序号
             * 参数2:一次是否可以签收多条消息
             */
            channel.basicAck(deliveryTag,true);
        } catch (IOException e) {
            System.out.println("消息接收失败!");
            Thread.sleep(2000);
            /**
             * 参数1:消息投递序号
             * 参数2:一次是否可以签收多条消息
             * 参数3:拒签后消息是否可以重回到队列中
             */
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

成功执行后如下图:

那么如何模拟当程序出现bug拒绝签收呢,只需要在try语句加一句int i=1/0即可执行后如下:

可以看得到,一直在打印消息接收失败,那就是因为消费者一直向RabbitMQ接收消息。但是一直处理失败。

我正在参与2024腾讯技术创作特训营最新征文,快来和我瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概念
  • 二、确认模式
  • 三、退回模式
  • 四、Ack
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档