前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ消息队列之实现可靠投递的请求-确认机制

RabbitMQ消息队列之实现可靠投递的请求-确认机制

作者头像
JavaEdge
发布2021-02-22 16:05:33
1.1K0
发布2021-02-22 16:05:33
举报
文章被收录于专栏:JavaEdge

0 可靠投递的意义

保证消息不丢失,可靠抵达,可使用事务消息,性能下降250倍,为此引入确认机制

  • publisher confirmCallback确认模式
  • publisher returnCallback未投递到queue退回模式
  • consumer ack机制

使用 RabbitMQ 时,消息发送方期望杜绝任何消息丢失或投递失败场景(否则 MQ 将失去其存在意义)。RabbitMQ 提供两个选项控制消息的投递可靠性模式。

RabbitMQ 消息投递的链路

代码语言:javascript
复制
producer->rabbitmq broker cluster->exchange->queue->consumer

message

  • 从 producer 到 broker 会返回一个 confirmCallback
  • 从 exchange->queue 投递失败则会返回一个 returnCallback 。 利用这两个 callback 控制消息的最终一致性和部分纠错能力。

保证消息的百分百投递成功。

1 confirmCallback 确认模式

在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。

代码语言:javascript
复制
CachingConnectionFactory factory = new CachingConnectionFactory();
// 开启confirm模式
factory.setPublisherConfirms(true);
代码语言:javascript
复制
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
               log.error("消息发送失败!" + cause + data.toString());
        } else {
            log.info("消息发送成功,消息ID:" + (data != null ? data.getId() : null));
        }
    });

ConfirmCallback 接口

代码语言:javascript
复制
public interface ConfirmCallback {

		/**
		 * Confirmation callback.
		 * @param correlationData correlation data for the callback.
		 * @param ack true for ack, false for nack
		 * @param cause An optional cause, for nack, when available, otherwise null.
		 */
		void confirm(CorrelationData correlationData, boolean ack, String cause);

	}
CorrelationData 对象
  • 标记接口,用于发送关联消息的信息数据。 一个适用场景比如关联一个发送确认通知。
  • CorrelationData 基类,用于关联发布确认到发送消息。 使用org.springframework.amqp.rabbit.core.RabbitTemplate方法,包括,这些作为参数之一; 当接收到发布确认时,该CorrelationData返回ACK / NACK

每个发送的消息都需配备一个 CorrelationData 相关数据对象,内部的 id 属性即用来表示当前消息的唯一性。

发送时创建一个 CorrelationData 对象。

代码语言:javascript
复制
User user = new User();
user.setID(1010101L);
user.setUserName("plen");

rabbitTemplate.convertAndSend(exchange, routing, user,
        message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        },
new CorrelationData(user.getID().toString()));

这里将 user ID 设置为当前消息 CorrelationData id,纯粹 demo,真实场景是需要做业务无关消息 ID 生成,同时要记录下这个 id 用来纠错和对账。

消息只要被 rabbitmq broker 接收到就会执行 confirmCallback

  • 如果是 cluster 模式,需所有 broker 接收到才会调用 confirmCallback

被 broker 接收到只能表示 message 已到达服务器,并不能保证消息一定会被投递到目标 queue。所以还需 returnCallback

2 returnCallback 未投递到queue退回模式

confrim 模式仅保证消息到达 broker,无法保证消息准确投递到目标 queue。 有些业务场景下,我们需保证消息一定要投递到目标 queue,这时就要用到 return 退回模式。

同样创建 ConnectionFactory 到时候需要设置 PublisherReturns(true) 选项。

代码语言:javascript
复制
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//开启return模式
代码语言:javascript
复制
rabbitTemplate.setMandatory(true);//开启强制委托模式

rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                    exchange, routingKey) ->
    log.info(MessageFormat.format("消息发送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。

1 Producer 的可靠性投递

1.1 要求

  1. 保证消息的成功发出
  2. 保证MQ节点的成功接收
  3. 发送端收到MQ节点(Broker) 确认应答
  4. 完善的消息补偿机制

在实际生产中,很难保障前三点完全可靠。在极端环境,生产者发送消息失败,发送端在接受确认应答时突然发生网络闪断等,很难保障可靠性投递,所以就需第四点完善的消息补偿机制。

1.2 解决方案

1.2.1 消息信心落库,对消息状态进行打标(常见方案)

将消息持久化到DB并设置状态值,收到Consumer的应答就改变当前记录的状态. 再轮询重新发送没接收到应答的消息,注意这里要设置重试次数.

方案流程图
方案实现流程

比如我下单成功 step1 - 对订单数据入BIZ DB订单库,并对因此生成的业务消息入MSG DB消息库

此处由于采用了两个数据库,需要两次持久化操作,为了保证数据的一致性,有人可能就想着采用分布式事务,但在大厂实践中,基本都是采用补偿机制!

这里一定要保证step1 中消息都存储成功了,没有出现任何异常情况,然后生产端再进行消息发送。如果失败了就进行快速失败机制

对业务数据和消息入库完毕就进入 setp2 - 发送消息到 MQ 服务上,如果一切正常无误消费者监听到该消息,进入

step3 - 生产端有一个Confirm Listener,异步监听Broker回送的响应,从而判断消息是否投递成功

  • step4 - 如果成功,去数据库查询该消息,并将消息状态更新为1
  • step5 - 如果出现意外情况,消费者未接收到或者 Listener 接收确认时发生网络闪断,导致生产端的Listener就永远收不到这条消息的confirm应答了,也就是说这条消息的状态就一直为0了,这时候就需要用到我们的分布式定时任务来从 MSG 数据库抓取那些超时了还未被消费的消息,重新发送一遍 此时我们需要设置一个规则,比如说消息在入库时候设置一个临界值timeout,5分钟之后如果还是0的状态那就需要把消息抽取出来。这里我们使用的是分布式定时任务,去定时抓取DB中距离消息创建时间超过5分钟的且状态为0的消息。

step6 - 把抓取出来的消息进行重新投递(Retry Send),也就是从第二步开始继续往下走

step7 - 当然有些消息可能就是由于一些实际的问题无法路由到Broker,比如routingKey设置不对,对应的队列被误删除了,那么这种消息即使重试多次也仍然无法投递成功,所以需要对重试次数做限制,比如限制3次,如果投递次数大于三次,那么就将消息状态更新为2,表示这个消息最终投递失败,然后通过补偿机制,人工去处理。实际生产中,这种情况还是比较少的,但是你不能没有这个补偿机制,要不然就做不到可靠性了。

该方案在高并发的场景下是否合适? 对于第一种方案,需要做两次DB持久化,在高并发下显然DB存在性能瓶颈。 其实在核心链路中

  • 只需入库业务即可
  • 消息没必要先入库,我们可以做消息的延迟投递,做二次确认,回调检查

所以,下面让我们看方案二:

1.2.2 消息延迟投递,两次确认,回调检查(大规模海量数据方案)

大厂经典实现方案。当然这种方案不一定能保障百分百投递成功,但是基本上可以保障大概99.9%的消息是OK的,有些特别极端的情况只能是人工去做补偿了,或者使用定时任务。主要就是为了减少DB操作。

流程图
  • Upstream Service 上游服务,即生产端
  • Downstream service 下游服务,即消费端
  • Callback service 回调服务
  • MQ Broker 消息队列的集群
实现流程
step1:first send

先入库业务消息(BIZ DB),再Pro发出消息,注意顺序!(为了避免性能瓶颈,主流互联网不会采用事务)

step2:Second Send Delay Check

在发送消息之后,紧接着Pro再发一条消息,即延迟消息投递检查,这里需设置一个延迟时间,比如5min后投递。

step3:Listener Consume

Con监听指定的队列,处理收到的消息。

step4:Send Confirm

处理完成之后,发送一个confirm消息,即回送响应。但其不是普通ACK,而是重新生成一条消息,投递到MQ,表示处理成功。

step5:Listener Confirm

Callback service是一个单独的服务,它扮演MSG DB角色,它通过MQ监听下游服务发送的confirm消息,若监听到confirm消息,那么就对其持久化到MSG DB。

step6:Check Detail

step2中的5min后的延迟消息发送到了MQ,然后callback service还是去监听延迟消息所对应的队列,收到Check消息后去检查DB中是否存在消息:

  • 存在,则不需要做任何处理
  • 不存在或者消费失败,那么Callback service就需要主动发起RPC通信给上游服务,告诉它延迟检查的这条消息我没有找到,你需要重发,Pro收到信息后就会重新查询BIZ DB然后将消息发送出去。
设计目的

少做一次DB的存储。 在高并发场景下,最关心的不是消息百分百投递成功,而是保证性能,保证能抗得住这么大的并发量。所以能节省DB操作就尽量节省,异步进行补偿。

其实在主流程里面是没有Callback service的,它属于一个补偿的服务,整个核心链路就是Pro入库业务消息,发送消息到MQ,Con监听队列,消费消息。其他的步骤都是一个补偿机制。

总结

这两种方案都可行。 需要根据实际业务来进行选择,方案二也是互联网大厂更为经典和主流的解决方案。但是若对性能要求不是那么高,方案一要更简单。

参考

  • https://www.cnblogs.com/wangiqngpei557/p/9381478.html
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/07/29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0 可靠投递的意义
    • RabbitMQ 消息投递的链路
    • 1 confirmCallback 确认模式
      • ConfirmCallback 接口
      • 2 returnCallback 未投递到queue退回模式
      • 1 Producer 的可靠性投递
        • 1.1 要求
          • 1.2 解决方案
            • 1.2.1 消息信心落库,对消息状态进行打标(常见方案)
            • 1.2.2 消息延迟投递,两次确认,回调检查(大规模海量数据方案)
        • 总结
        相关产品与服务
        消息队列 CMQ
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档