专栏首页Java识堂RocketMQ消息为什么会被重复消费?

RocketMQ消息为什么会被重复消费?

从上帝视角看一下消息发送和消费

当我们使用RocketMQ时,RocketMQ-Dashboard是一个非常好用的图形化界面工具

我们首先在RocketMQ-Dashboard上创建一个topic,每个topic下4个队列

每个topic是一类消息的集合,topic下面再细分queue是为了提高消息消费的并发度

「当producer发送topic消息时,应该往topic下的哪个queue来发送呢?」

producer会采用轮询的策略发送

「那么consumer应该消费哪个queue下的消息呢?」

当有一个消费者时当然是消费所有的queue

「如果有多个消费者呢?」

只需要根据各种负载均衡策略将队列分配给消费者即可,如下图是两种负载均衡的方式

你问我这两种负载策略怎么实现的?去看看源码呗,详细过程我就不分析了

「如果消费者数量超过队列的数量会发生什么?」

多出来的消费者将不会消费任何队列

「为什么一个consumer只能消费一个queue呢?」

多个消费者消费一个queue肯定会有并发问题,所以得加锁,这样还不如把topic下的队列数量设置的多一点

「我在运行的过程中可以设置topic下queue的数量吗?」

当然可以。不仅可以重新设置queue的数量,还可以实时增减consumer,以应对不同流量的场景

「那这样说当queue或者consumer的数量发生变化的时候,需要重新执行负载均衡吧?」

是的,大家一般把这个过程叫做重平衡

下面我们来分享一下详细的细节

消息发送流程

消息发送主要有3种方式单向发送(只发送,不管结果),同步发送和异步发送

消息消费流程

消息是基于推还是拉?

消息消费的模式有两种方式:

  1. 拉取:Consumer不断从Broker拉取
  2. 推送:Broker向Consumer推送

这两种方式都有各自的缺点:

  1. 拉取:拉取的间隔不好确定,间隔太短没消息时会造成带宽浪费,间隔太长又会造成消息不能及时被消费
  2. 推送:「推送和速率难以适配消费速率」,推的太快,消费者消费不过来怎么办?推的太慢消息不能及时被消费

「看起来拉取和推送难以抉择」

然后就有大佬把拉取模式改了一下,即不会造成带宽浪费,也能基于消费的速率来决定拉取的频率!

「你猜怎么改的?」

其实很简单,Consumer发送拉取请求到Broker端,如果Broker有数据则返回,Consumer端再次拉取。如果Broker端没有数据,不立即返回,而是等待一段时间(例如5s)。

  1. 如果在等待的这段时间,有要拉取的消息,则将消息返回,Consumer端再次拉取。
  2. 如果等待超时,也会直接返回,不会将这个请求一直hold住,Consumer端再次拉取

「对了,这种策略就叫做长轮询」

「RocketMQ中有拉和推两种消费方式,但是推是基于长轮询做的」

具体消费流程

「拉取到消息后是怎么处理的呢?」

PullRequest类的成员变量如下图

当拉取到消息后,消息会被放入msgTreeMap,其中key为消息的offset,value为消息实体

「另外还有一个重要的属性dropped,和重平衡相关,重平衡的时候会造成消息的重复消费,具体机制不分析了,看专栏把」

msgCount(未消费消息总数)和msgSize(未消费消息大小)是和流控相关的

「什么是流控呢?」

就是流量控制,当消费者消费的比较慢时,减缓拉取的速度。如下图

当从阻塞队列中获取PullRequest时,并不会直接发起网络请求,而是先看看是否触发流控的规则,比如未消费的消息总数超过一定值,未消费的消息大小超过一定值等

接着就是收到响应,处理消息,并键PullRequest再次放入阻塞队列.

「是不是落了一个步骤?就是Consumer告诉Broker这部分消息我消费了?」

嗯嗯,你是不是以为提交offset的过程是同步的?其实并不是,「是异步的」

Consumer怎么提交offset?

当consumer消费完消息只是将offset存在本地,通过定时任务将offset提交到broker,另外broker收到提交offset的请求后,也仅仅是将offset存在map中,通过定时任务持久化到文件中

「这样就会造成消息的重复消费」

  1. Consumer消费完消息并不是实时同步到Broker的,而是将offset先保存在本地map中,通过定时任务持久化上去。这就导致消息被消费了,但是此时消费者宕机了导致offset没提交,下次没提交offset的这部分消息会被再次消费
  2. 即使offset被提交到了Broker,在还没来得及持久化的时候Broker宕机了,当重启的时候Broker会读取consumerOffset.json中保存的offset信息,这就会导致没持久化offset的这部分消息会被再次消费
文章分享自微信公众号:
Java识堂

本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!

作者:小识
原始发表时间:2022-03-28
如有侵权,请联系 cloudcommunity@tencent.com 删除。
登录 后参与评论
0 条评论

相关文章

  • 消息中间件—RocketMQ消息消费(三)(消息消费重试)

    摘要:如果Consumer端消费消息失败,那么RocketMQ是如何对失败的异常情况进行处理? 前面两篇RocketMQ消息消费(一)/(二)篇,主要从Pus...

    用户2991389
  • 面试官:RocketMQ 如何保证消息不丢失,如何保证消息不被重复消费?

    点击上方“芋道源码”,选择“设为星标” 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 |...

    芋道源码
  • 消息中间件—RocketMQ消息消费(一)

    文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在RocketMQ系列文章的前面几...

    用户2991389
  • Redis消息队列重复消费问题

    我们目前项目中短信模块就是采用的 Redis 来作消息队列,起因是最近有应用反映下发短信时,偶尔会有发送两次的情况。

    Li_XiaoJin
  • RocketMQ主从如何同步消息消费进度?

    如果消费者消费模式不同,也会有不同的保存方式,消费者端的消息消费进度保存到 OffsetStore 中,他有两个实现类:

    张乘辉
  • 探索RocketMQ的重复消费和乱序问题

    但MQ在实际应用中不是说保证消息不丢失就万无一失了,它还有两个令人头疼的问题:重复消费和乱序。

    lbyxiaolizi
  • 探索RocketMQ的重复消费和乱序问题

    但MQ在实际应用中不是说保证消息不丢失就万无一失了,它还有两个令人头疼的问题:重复消费和乱序。

    HUC思梦
  • 消息队列之kafka的重复消费

    Kafka 是对分区进行读写的,对于每一个分区的消费,都有一个 offset 代表消息的写入分区时的位置,consumer 消费了数据之后,每隔一段时间,会把自...

    conanma
  • 《RabbitMQ》如何保证消息不被重复消费

    为什么会出现消息重复?消息重复的原因有两个:1.生产时消息重复,2.消费时消息重复。

    Java旅途
  • RocketMQ系列(三)消息的生产与消费

    前面的章节,我们已经把RocketMQ的环境搭建起来了,是一个两主两从的异步集群。接下来,我们就看看怎么去使用RocketMQ,在使用之前,先要在NameSer...

    小忽悠
  • RocketMQ系列(三)消息的生产与消费

    前面的章节,我们已经把RocketMQ的环境搭建起来了,是一个两主两从的异步集群。接下来,我们就看看怎么去使用RocketMQ,在使用之前,先要在NameSer...

    小忽悠
  • 消息中间件—RocketMQ消息消费(二)(push模式实现)

    摘要:在RocketMQ中,消息消费都是基于Pull消息方式,那么Push模式中又是如何实现Consumer端准实时消费的呢? 在上一篇—“消息中间件—Roc...

    用户2991389
  • 使用Disruptor完成多个消费者不重复消费消息

    上一篇https://blog.csdn.net/tianyaleixiaowu/article/details/79787377里讲了Disruptor完成多...

    天涯泪小武
  • 如何保证消息不被重复消费?(如何保证消息消费时的幂等性)?

    消息重复和幂等问题是很常见的问题,这俩问题基本可以放在一起。 既然是消费消息,那肯定要考虑考虑会不会重复消费?能不能避免重复消费?或者重复消费了也别造成系统异...

    名字是乱打的
  • 面试题:如何保证消息不被重复消费?

    其实这是很常见的一个问题,这俩问题基本可以连起来问。既然是消费消息,那肯定要考虑会不会重复消费?能不能避免重复消费?或者重复消费了也别造成系统异常可以吗?这个是...

    用户1263954
  • Spring Cloud Stream如何处理消息重复消费?

    最近收到好几个类似的问题:使用Spring Cloud Stream操作RabbitMQ或Kafka的时候,出现消息重复消费的问题。通过沟通与排查下来主要还是用...

    程序猿DD
  • mq要如何处理消息丢失、重复消费?

    如果要你实现一个支付宝向余额宝转账的功能,比如:账户a从支付宝转出5000余额宝转入5000,该怎么做呢?

    苏三说技术
  • 如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

    其实这是很常见的一个问题,这俩问题基本可以连起来问。既然是消费消息,那肯定要考虑会不会重复消费?能不能避免重复消费?或者重复消费了也别造成系统异常可以吗?这个是...

    小东啊

扫码关注腾讯云开发者

领取腾讯云代金券