专栏首页余林丰消费端如何保证消息队列MQ的有序消费

消费端如何保证消息队列MQ的有序消费

消息无序产生的原因

消息队列,既然是队列就能保证消息在进入队列,以及出队列的时候保证消息的有序性,显然这是在消息的生产端(Producer),但是往往在生产环境中有多个消息的消费端(Consumer),尽管消费端在拉取消息时是有序的,但各个消息由于网络等方面原因无法保证在各个消费端中处理时有序。

场景分析

先后两次修改了商品信息,消息A和消息B先后同步写入MySQL,接着异步写入消息队列中发送消息,此时消息队列生产端(Producer)按时序先后发出了A和B两条消息(消息A先发出,消息B后发出)。按业务逻辑,商品信息的最终状态需要以消息A和消息B综合为准。

看似一个比较常见的同步写数据库,异步发送消息的场景,但实际上需要保证消息的有序消费。

  • 假设1:消息A只包含修改的商品名称,消息B只包含修改的商品重量,此时消息队列的消费端实际上不需要关注消息时序,消息队列消费端(Consumer)只管消费即可。
  • 假设2:消息A包含修改的商品名称、重量,消息B包含修改的商品名称,此时消费端首先接收到消息B,后接收到消息A,那么消息B的修改就会被覆盖。此时消息队列的消费端实际上又需要关注消息时序

可见,你无法保证消息中包含什么信息,此时必须保证消息的有序消费。

业务角度如何保证消息有序消费

  • 生产端在发送消息时,始终保证消息是全量信息。
  • 消费端在接收消息时,通过缓存时间戳的方式,消费消息时判断消息产生的时间是否最新,如果不是则丢弃,如果是则执行下一步。

下面通过伪代码的方式描述:

生产端伪代码 insertWare(ware); #插入数据到数据库,通常在插入数据库时我们只会update修改的字段,而不会全量插入 ware = selectWareById(ware.getId); #获取商品的全量信息(此时是最新的),用于将它放入到消息队列中 syncMq(ware); #异步发送mq消息A

消费端伪代码 ware = fetchWare(); #获取消息 if (isLasted(ware)) #通过商品的修改时间戳判断是否是最新的修改 ​ TODO #执行下一步业务逻辑 else ​ return #丢弃该消息

重点在于消费端如何判断该消息是否是最新的修改也就是isLasted方法。

isLasted方法 Long modified = getCacheById(ware.getId); #获取缓存中该条商品的最新修改时间 If (ware.getModified > modified) { #如果消息中商品修改时间大于缓存中的时间,说明是最新操作 ​ setCacheById(ware); #将该条消息的商品修改时间戳写入到缓存中 ​ return true; } else #如果消息中的商品修改时间小于缓存中的时间,说明该条消息属于“历史操作”,不对其更新 ​ return false;

以上就是通过伪代码的方式,描述如何通过业务手段保证消息有序消费,重点在于全量发送信息和缓存时间戳。在其中还有一些技术实现细节。

例如:消费端消费消息B,执行到获取时间戳缓存之后,并在重新设置新的缓存之前,此时另一个消费端恰好也正在消费B它也正执行到获取时间戳缓存,由于消息A此时并没有更新缓存,消息A拿到的缓存仍然是旧的缓存,这时就会存在两个消费端都认为自己所消费的消息时最新的,造成该丢弃的消息没丢。

显然,这是分布式线程安全问题,分布式锁通常使用Redis或者ZooKeeper,加锁后的执行时序如下图所示。

这是从业务角度保证消息在消费端有序消费。通过在消息发送端全量发送消息以及在消息消费端缓存时间戳就可以保证消息的有序消费。

在上述场景中是先同步写入MySQL,再获取商品全量数据,接着再异步发送消息。这一系列的步骤可以通过接MySQL的binlog实现,在同步写入MySQL后,MySQL发送binlog变更,通过阿里巴巴Canal中间件接收MySQL的binlog变更再发送消息到消息队列。

这是一个能给程序员加buff的公众号 (CoderBuff)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 5.比较排序之归并排序(非递归)

      在上一节中讲解了归并排序的递归版《4.比较排序之归并排序(递归)》,通常来讲,递归版的归并排序要更为常用,本节简单介绍下非递归版的归并排序。思路和递归版相...

    用户1148394
  • 缓存的简单实现

    //此文基于《Java并发编程实践》 我们都知道在应用程序中合理地使用缓存,能更快的访问我们之前的计算结果,从而提高吞吐量。例如Redis和Memcached基...

    用户1148394
  • 组合模式

    在之前有接触过组合模式,那是第一次接触设计模式,觉得什么是组合模式啊?什么部分与整体。现在再来看组合模式,觉得实际上并没有那么神秘。 组合模式:将对象组合成树形...

    用户1148394
  • 消费端如何保证消息队列MQ的有序消费

    消息队列,既然是队列就能保证消息在进入队列,以及出队列的时候保证消息的有序性,显然这是在消息的生产端(Producer),但是往往在生产环境中有多个消息的消费端...

    Java_老男孩
  • ActiveMQ介绍

    1、ActiveMQ服务器工作模型       通过ActiveMQ消息服务交换消息。消息生产者将消息发送至消息服务,消息消费者则从消息服务接收这些消息。这些消...

    小帅丶
  • 快速学习-RocketMQ设计理念

    消息存储是RocketMQ中最为复杂和最为重要的一部分,本节将分别从RocketMQ的消息存储整体架构、PageCache与Mmap内存映射以及RocketMQ...

    cwl_java
  • Kafka消息规范

    Kafka作为一个消息队列,有其自己定义消息的格式。Kafka中的消息采用ByteBuf,之所以采用ByteBuf这种紧密的二进制存储格式是因为这样可以节省大量...

    shysh95
  • 直播评论系统分析设计

    直播评论系统是电商系统一个常用的功能,即在发布新品的时候,为了吸引用户参与和营造互动气氛,让参与的每个人都可以发消息,发完后每个人都可以即时看到新消息,原型和 ...

    心平气和
  • 消息队列设计1 何时需要

    消息队列通常作为企业IT系统内部通信的核心手段 具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。

    JavaEdge
  • 消息队列| RocketMQ 核心原理

    a. fork下来,起一个demo,上一个测试环境,遇到问题再去社区提问或找些实践文章;

    heidsoft

扫码关注云+社区

领取腾讯云代金券