前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rabbitmq可靠消息投递,消息确认机制

Rabbitmq可靠消息投递,消息确认机制

原创
作者头像
vivi
修改2021-01-18 14:39:41
6310
修改2021-01-18 14:39:41
举报
文章被收录于专栏:vblogvblog

前言

我们知道,消息从发送到签收的整个过程是

Producer-->Broker/Exchange-->Broker/Queue-->Consumer,因此如果只是要保证消息的可靠投递,我们需要考虑的仅是前两个阶段,因为消息只要成功到达队列,就算投递成功。

  • 比如投递消息时指定的Exchange不存在,那么阶段一就会失败
  • 如果投递到Exchange成功,但是指定的路由件错误或者别的原因,消息没有从Exchange到达Queue,那就是第二阶段出错。
在这里插入图片描述
在这里插入图片描述

而从生产者和消费者角度来看,消息成功投递到队列才算成功投递,因此阶段一和阶段而都属于生产者一方需要关注,阶段三属于消费者一方,这里只考虑消息的成功投递,因此不考虑消费者的签收部分。而Rabbitmq和springboot整合时,默认是**没有开启**消息确认的。

开启消息确认机制

一、Producer --> Broker/Exchange ConfirmCallback
1. 配置
代码语言:txt
复制
# springboot2.2.0以前,

# spring.rabbitmq.publisher-confirms=true

# springboot2.2.0以后

spring.rabbitmq.publisher-confirm-type=correlated

关于这个Type有三种取值:

  • none:默认值,不开启confirmcallback机制
  • correlated:开启confirmcallback,发布消息时,可以指定一个CorrelationData,会被保存到消息头中,消息投递到Broekr时触发生产者指定的ConfirmCallback,这个值也会被返回,以进行对照处理,CorrelationData可以包含比较丰富的元信息进行回调逻辑的处理。无特殊需求,就设定为这个值。
  • simple:这个比较复杂,spring官方指出:
代码语言:txt
复制
> Normally, when using the template, a Channel is checked out of the cache (or created), used for the operation, and returned to the cache for reuse. In a multi-threaded environment, there is no guarantee that the next operation uses the same channel. There may be times, however, where you want to have more control over the use of a channel and ensure that a number of operations are all performed on the same channel.
代码语言:txt
复制
2529 / 5000

翻译结果

通常,使用模板时,会从缓存中检出(或创建)通道,以进行操作,然后将其返回到缓存中以进行重用。在多线程环境中,不能保证下一个操作使用相同的通道。但是,有时您可能希望更好地控制通道的使用,并确保在同一通道上执行全部操作。

代码语言:txt
复制
也就说这,这个`simple`模式:其一效果和`correlated`值一样能触发回调方法,其二用于发布消息成功后使用rabbitTemplate调用`waitForConfirms`或`waitForConfirmsOrDie`方法等待`broker`节点返回发送结果,需求根据返回结果来判定下一步的逻辑,执行更复杂的业务。要注意的点是`waitForConfirmsOrDie`方法如果返回false则会关闭`channel`,则接下来无法发送消息。
2. 如何使用

SpringBoot自动配置帮我们往容器中注册了一个RabbitTemplate,但因为默认没有开启消息确认机制,因此它在创建时并未配置confirmCallback属性,我们需要手动为其创建一个 RabbitTemplate.ConfirmCallback

因此我们需要拿到这个创建好的RabbitTemplate,再手动执行其setConfirmCallback方法。

拿到这个对象简单,只要一个@Autowired,问题是我们要保证之后通过@Autowired拿到的RabbitTemplate是已经注册了ConfirmCallback的。

我们手写一个配置类/或者随便什么类,加上注解@Component/@Service/@Controller/@Configuration,无论哪个,只要能被自动创建并加入容器,然后我们写一个方法,加上@PostConstructor表示创建这个对象完成时需要回调这个方法,我们在这个类中拿到RabbitTemplate,在这个方法中执行它的setConfirmCallback,这样spring容器在创建我们这个配置类的时候将创好的RabbitTemplate进行了完善,而整个过程都是在spring'boot启动过程中自动完成,就能保证我们之后使用@Autowired拿到的RabbitTemplate就是注册号confirmCallback的。

代码语言:txt
复制
@Configuration

@Slf4j

public class RabbitConfig {



    @Autowired

    RabbitTemplate rabbitTemplate;



     // 方法名无所谓,主要是 @PostConstruct 指定它一定会被回调

    @PostConstruct

    public void setCallback() {

        /\*\*

         \* 为容器创建好的rabbitTemplate注册confirmCallback

         \* 消息由生产者投递到Broker/Exchange回调

         \*/

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

            /\*\*

             \* @param correlationData 发送消息时指定的唯一关联数据(消息id)

             \* @param ack 这个消息是否成功投递到Exchange

             \* @param cause 失败的原因

             \*/

            @Override

            public void confirm(CorrelationData correlationData, boolean ack, String cause) {

                if (ack) {

                    log.info("消息投递到交换机成功:[correlationData={}]",correlationData);

                } else {

                    log.error("消息投递到交换机失败:[correlationData={},原因:{}]", correlationData, cause);

                }

            }

        });

    }

}
二、Exchange-->Queue ConfirmCallback
1. 配置
  • 注意下面两项**必须**同时配置,可以尝试不配置第二项,通过测试能够发现当消息路由到Queue失败(比如路由件错误)时,returnCallback并未被回调。
代码语言:txt
复制
# 开启阶段二(消息从E->Q)的确认回调    Exchange --> Queue  returnCallback

spring.rabbitmq.publisher-returns=true

# 官方文档说此时这一项必须设置为true

# 实际上这一项的作用是:消息【未成功到达】队列时,能监听到到路由不可达的消息,以异步方式优先调用我们自己设置的returnCallback,默认情况下,这个消息会被直接丢弃,无法监听到

spring.rabbitmq.template.mandatory=true
2. 如何使用

和注册confirmCallback的原理一样,就不多赘述,直接看配置,需要注意的是 **这个回调只会在消息在从Exchange投递到Queue【失败】时被执行**。

代码语言:txt
复制
@Configuration

@Slf4j

public class RabbitConfig {



    @Autowired

    RabbitTemplate rabbitTemplate;



    @PostConstruct

    public void setCallback() {

        

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

             /\*\*

             \* 能够看出来,参数中并没有像confirmCallback那样提供的boolean类型                         的ack,因此这个回调只是在【失败】情况下触发的

             \* @param message 发送的消息

             \* @param replyCode 回复错误码

             \* @param replyText 回复错误内容

             \* @param exchange  发送消息时指定的交换机

             \* @param routingKey 发送消息时使用的路由件

             \*/

            @Override

            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {

                log.error("路由到队列失败,消息内容:{},交换机:{},路由件:{},回复码:{},回复文本:{}", message, exchange, routingKey, replyCode, replyText);

            }

        });

    }

}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 开启消息确认机制
    • 一、Producer --> Broker/Exchange ConfirmCallback
      • 1. 配置
      • 2. 如何使用
    • 二、Exchange-->Queue ConfirmCallback
      • 1. 配置
      • 2. 如何使用
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档