专栏首页算法之名RabbitMQ消息的100%投递 顶

RabbitMQ消息的100%投递 顶

生产端的可靠性投递

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点确认应答
  • 完善的消息进行补偿机制

解决方案

  • 消息落库,对消息状态进行打标
  • 消息的延迟投递,做二次确认,回调检查

消息落库架构图

上图中BIZ DB为我们的业务库,比方说保存订单;MSG DB为消息库,保存我们发送到MQ消息。如果在Step 3的时候,网络出现故障,Confirm机制没有收到broker的消息确认。我们需要设置一个时间临界点,比如说5分钟,检索出消息库中状态为0的消息,通过分布式定时任务,比如XXL-Job或者Elastic-Job。但有可能出现消息刚发出去,还没有Confirm成功,定时任务就已经启动了,并把发送成功的消息确认为未成功,所以我们需要有一个Step 6,给入库消息一个最大的容忍时间,比如说2分钟到5分钟。比如消息入库的时候需要带上时间,我们取出状态为0的消息形成一个集合,然后过滤该集合的时间为2分钟以上的消息进行重新发送。由于MQ消息的配置本身有问题的情况下(比如说路由,队列,交换机),会出现消息永远无法发送成功,这个时候我们需要有一个消息重试的机制,比如3次,如果3次都没有发送成功,则更新该消息状态为2,表示失败。

但是这种方式有一定的局限性,因为要做数据库的二次入库操作,磁盘io会成为瓶颈,在高并发的场景下并不合适。由于我们的业务入库是必须的,所以我们要考虑消息入库是否可以取消。所以我们要考虑第二种方式:消息的延迟投递,做二次确认,回调检查。

这里的Upstream service为我们的上流业务,我们肯定要做的是进行业务入库,然后再发消息到Broker,这里的不同在于Step2,Step2需要再发一条消息,但这条消息是一个延迟消息(这种延迟消息并不是死信队列那种延迟消息,我们可以用DelayQueue来发这条消息),可能是2到5分钟之后才发出去的,该消息与Step1虽然消息内容一样,但是它们投递的队列不同,该队列被Call back service监听。Downstream service为MQ消费者,但它同时也是消息投递者,它会在Step4发送一个confirm消息(该消息也是一个MQ消息)到Broker,但是这条消息并不是发给上流生产者的,而是发给Call back service的,Call back service作为消息消费者收到这条消息,然后做一个消息的持久化存储,存入数据库中。3到5分钟之后,Call back service收到延迟发送的Step2的消息,再去对比消息数据库中的消息,更新消息数据库中的状态为1。如果在此过程中,Downstream service没有发送消息给Call back service,则消息数据库中没有这条消息,当Call back service收到Step2的延迟消息的时候,就通过RPC或者Restful重新调用upstream中的业务库检查后,再重新发送一条step1消息,走下一个流程。

以上的这个方案并不能保证100%的消息投递,但是它对于第一种方案的好处是少进行了一次DB操作,保证高并发的性能。比如说第一种方式的吞吐量为1000,那第二种方式的吞吐量就可以翻倍到2000,可以节省一台服务器。这里把消息补偿机制从第一种方式的嵌入到核心链路给解耦为一个单独的微服务,核心链路则只有消息发送到消息处理。同时下游业务处理应该加上消息的幂等,确保只处理一个消息。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RabbitMQ延迟队列设置 顶

    延迟消费。比如:用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单;用户注册成功之后,需要过一段时间比如一周后校验用户的使...

    算法之名
  • Spark RDD篇

    RDD是一个抽象,会记录一些信息,他并不是一个真正的集合,但可以像集合一样操作,降低了开发难度。

    算法之名
  • NIO如何多线程操作 顶

    因为NIO本身是非阻塞的,所以他的消息选择器Selector可以在单线程下连接多台客户端的访问。

    算法之名
  • ActiveMQ介绍

    1、ActiveMQ服务器工作模型       通过ActiveMQ消息服务交换消息。消息生产者将消息发送至消息服务,消息消费者则从消息服务接收这些消息。这些消...

    小帅丶
  • 直播评论系统分析设计

    直播评论系统是电商系统一个常用的功能,即在发布新品的时候,为了吸引用户参与和营造互动气氛,让参与的每个人都可以发消息,发完后每个人都可以即时看到新消息,原型和 ...

    心平气和
  • 消息队列设计精要

    消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。 当今市面上...

    美团技术团队
  • Kafka消息规范

    Kafka作为一个消息队列,有其自己定义消息的格式。Kafka中的消息采用ByteBuf,之所以采用ByteBuf这种紧密的二进制存储格式是因为这样可以节省大量...

    shysh95
  • MQ·将多消息合并为一条消息的发送、消费的设计与实现

    这是笔者最近处理一个叫异步大点击的业务问题所思考出来的方案。由于mq使用的是亚马逊的sqs服务,而sqs是按请求数消费的原因,所以才有的将多消息合并为一条消息发...

    黄泽杰
  • 什么是消息系统?

    首先来看什么是消息系统?简单来讲就是生产者发送包含事件的消息给消息系统,然后将消息推送给消费者。消息系统有很多种,最简单的就是TCP连接这种的直接信道,之后出现...

    哒呵呵
  • 轻松搞定RocketMQ入门

    (1)NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步

    烂猪皮

扫码关注云+社区

领取腾讯云代金券