前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【RabbitMQ】消息可靠性投递

【RabbitMQ】消息可靠性投递

原创
作者头像
后端码匠
修改2023-11-12 15:59:09
2600
修改2023-11-12 15:59:09
举报
文章被收录于专栏:后端码匠

RabbitMQ消息可靠性投递

什么是消息的可靠性投递?即保证消息百分百发送到消息队列中去,消息发送端需要接受到mq服务端接受到消息的确认应答。除此之外还应有完善的消息补偿机制,发送失败的消息可以再感知并二次处理。 生产者到交换机通过confirmCallback交换机到队列通过returnCallback

当前环境

RabbitMQConfig

代码语言:java
复制
package cn.com.codingce.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

    /**
     * 交换机名称
     */
    public static final String EXCHANGE_NAME = "health_hra3_exchange";

    /**
     * 队列名称
     */
    public static final String QUEUE = "health_hra3_queue";


    @Bean
    public Exchange healthHra3Exchange() {
        // 创建交换机,durable代表持久化,使用Bean注入
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    @Bean
    public Queue healthHra3Queue() {
        // 创建队列,使用Bean注入
        return QueueBuilder.durable(QUEUE).build();
    }

    /**
     * 交换机和队列绑定关系
     *
     * @param queue    上面注入的队列Bean,如果你的项目又多个,记得给Bean取名字
     * @param exchange 上面注入的交换机Bean
     */
    @Bean
    public Binding healthHra3Binding(Queue queue, Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("health.#").noargs();
    }

}

HealthHra3MQListener

代码语言:java
复制
package cn.com.codingce.listener;

import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
@RabbitListener(queues = RabbitMQConfig.QUEUE) // 监听的队列名称
public class HealthHra3MQListener {

    /**
     * RabbitHandler会自动匹配消息类型(消息自动确认)
     *
     * @param msg     发送的是String类型,这里用String进行接收,RabbitHandler会自动进行匹配
     * @param message
     * @throws IOException
     */
    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {
        log.info("releaseCouponRecord into"); // 监听到消息:消息内容,msg=新HRA3报告来啦!!
        long msgTag = message.getMessageProperties().getDeliveryTag();
        log.info("监听到消息:消息内容,msg={}", msg); // 监听到消息:消息内容,msg=新HRA3报告来啦!!
        log.info("msgTag={}", msgTag); // msgTag=1
        log.info("message={}", message.toString()); // message=(Body:'新HRA3报告来啦!!' MessageProperties [headers={}, ……
    }

}

yml

代码语言:yaml
复制
server:
  port: 9090

spring:
  application:
    # 微服务系统有意义, 养成好习惯, 先写出来
    name: rabbitmq-02-springboot
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /codingce
    # 新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
    publisher-confirm-type: correlated

  thymeleaf.cache: false

可靠性投递confirmCallback

confirmCallback是生产者到交换机,可以理解为确认消息是否发送成功。新版依赖可靠性投递默认是关闭的,使用以下方法开启:

代码语言:shell
复制
#旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated

yml

代码语言:yaml
复制
server:
  port: 9090

spring:
  application:
    # 微服务系统有意义, 养成好习惯, 先写出来
    name: rabbitmq-02-springboot
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /codingce
    #新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
    publisher-confirm-type: correlated

  thymeleaf.cache: false

编码实现confirmCallback

代码语言:java
复制
package cn.com.codingce.controller;

import cn.com.codingce.common.utils.R;
import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("api")
@Slf4j
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 可靠性投递confirmCallback
     *
     * @return
     */
    @GetMapping("/confirmCallback")
    public R confirmCallback() {
        log.info("可靠性投递 confirmCallback");
         /*
          correlationData:配置
          ack:交换机是否收到消息,true是成功,false是失败
          cause:失败的原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("confirm==== ack={}", ack);
            log.info("confirm==== cause={}", cause);
            if (ack) {
                log.info("发送成功,{}", cause);
            } else {
                log.error("发送失败,{}", cause);
            }
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "health.new", "新HRA3报告来了!!");
        return R.ok();
    }

    @GetMapping(value = "/default", produces = "text/html;charset=utf-8")
    public String getDefault() {
        return "队列服务运行正常...";
    }

}

LOG

代码语言:shell
复制
2023-11-12 15:48:31.782  INFO 6840 --- [nio-9090-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-11-12 15:48:31.782  INFO 6840 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2023-11-12 15:48:31.783  INFO 6840 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2023-11-12 15:48:42.146  INFO 6840 --- [nio-9090-exec-5] c.c.codingce.controller.SendController   : 可靠性投递 confirmCallback
2023-11-12 15:48:42.155  INFO 6840 --- [nectionFactory1] c.c.codingce.controller.SendController   : confirm==== ack=true
2023-11-12 15:48:42.156  INFO 6840 --- [nectionFactory1] c.c.codingce.controller.SendController   : confirm==== cause=null
2023-11-12 15:48:42.156  INFO 6840 --- [nectionFactory1] c.c.codingce.controller.SendController   : 发送成功,null
2023-11-12 15:48:42.159  INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : releaseCouponRecord into
2023-11-12 15:48:42.159  INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : 监听到消息:消息内容,msg=新HRA3报告来了!!
2023-11-12 15:48:42.159  INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : msgTag=1
2023-11-12 15:48:42.159  INFO 6840 --- [ntContainer#0-1] c.c.c.listener.healthHra3MQListener      : message=(Body:'新HRA3报告来了!!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=health_hra3_exchange, receivedRoutingKey=health.new, deliveryTag=1, consumerTag=amq.ctag-7HDzHnEZ0foZ_MrQmGqFYQ, consumerQueue=health_hra3_queue])

可靠性投递returnCallback

returnCallback交换机到队列,消息从交换器发送到对应队列失败时触发:

  • 第一步 开启returnCallback配置
代码语言:text
复制
spring.rabbitmq.publisher-returns=true #新版
  • 第二步 修改交换机投递到队列失败的策略
代码语言:text
复制
# 为true,则交换机处理消息到路由失败,则会返回给生产者
spring.rabbitmq.template.mandatory=true

yml

代码语言:yaml
复制
server:
  port: 9090

spring:
  application:
    # 微服务系统有意义, 养成好习惯, 先写出来
    name: rabbitmq-02-springboot
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /codingce
    # 新版,NONE值是禁用发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调方法
    publisher-confirm-type: correlated
    #########################################################################################
    # 为true,则交换机处理消息到路由失败,则会返回给生产者
    publisher-returns: true

  thymeleaf.cache: false

编码实实现returnCallback

代码语言:java
复制
package cn.com.codingce.controller;

import cn.com.codingce.common.utils.R;
import cn.com.codingce.config.RabbitMQConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("api")
@Slf4j
public class SendController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/returnCallback")
    public R returnCallback() {
        log.info("交换机到队列通过returnCallback 可靠性投递 returnCallback");
        // 为true,则交换机处理消息到路由失败,则会返回给生产者,开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            int code = returnedMessage.getReplyCode();
            log.info("returnCallback code={}", code);
            log.info("returnCallback returned={}", returnedMessage);
        });
        // 这个routingKey是不存在的,它找不到这个路由,所以会出现异常从而触发上面的回调方法
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xxx.health.new", "新HRA3报告来了!!");
        return R.ok();
    }

    @GetMapping(value = "/default", produces = "text/html;charset=utf-8")
    public String getDefault() {
        return "队列服务运行正常...";
    }

}

LOG

代码语言:shell
复制
2023-11-12 15:53:22.820  INFO 4848 --- [nio-9090-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-11-12 15:53:22.820  INFO 4848 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2023-11-12 15:53:22.821  INFO 4848 --- [nio-9090-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2023-11-12 15:53:22.831  INFO 4848 --- [nio-9090-exec-1] c.c.codingce.controller.SendController   : 交换机到队列通过returnCallback 可靠性投递 returnCallback
2023-11-12 15:53:22.839  INFO 4848 --- [nectionFactory1] c.c.codingce.controller.SendController   : returnCallback code=312
2023-11-12 15:53:22.840  INFO 4848 --- [nectionFactory1] c.c.codingce.controller.SendController   : returnCallback returned=ReturnedMessage [message=(Body:'新HRA3报告来了!!' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=health_hra3_exchange, routingKey=xxx.health.new]

!!!开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议用消息确认机制

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RabbitMQ消息可靠性投递
    • 当前环境
      • RabbitMQConfig
      • HealthHra3MQListener
      • yml
    • 可靠性投递confirmCallback
      • 编码实现confirmCallback
      • LOG
    • 可靠性投递returnCallback
      • 编码实实现returnCallback
      • LOG
相关产品与服务
消息队列 CMQ
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档