专栏首页Java识堂RocketMQ消息丢失如何排查?

RocketMQ消息丢失如何排查?

消息丢失如何排查?

当我们在使用mq的时候,经常会遇到消息消费异常的问题,原因有很多种,比如

  1. producer发送失败
  2. consumer消费异常
  3. consumer根本就没收到消息

「那么我们该如何排查了?」

其实借助RocketMQ-Dashboard就能高效的排查,里面有很多你想象不到的功能

首先我们先查找期望消费的消息,查找的方式有很多种,根据消息id,时间等

「消息没找到?」

说明proder发送异常,也有可能是消息过期了,因为rocketmq的消息默认保存72h,此时到producer端的日志进一步确认即可

「消息找到了!」

接着看消息的消费状态,如下图消息的消费状态为NOT_ONLINE

「NOT_ONLINE代表什么含义呢?」

别着急,我们一步步来分析,先看看TrackType到底有多少种状态

public enum TrackType {
    CONSUMED,
    CONSUMED_BUT_FILTERED,
    PULL,
    NOT_CONSUME_YET,
    NOT_ONLINE,
    UNKNOWN
}

每种类型的解释如下

类型

解释

CONSUMED

消息已经被消费

CONSUMED_BUT_FILTERED

消息已经投递但被过滤

PULL

消息消费的方式是拉模式

NOT_CONSUME_YET

目前没有被消费

NOT_ONLINE

CONSUMER不在线

UNKNOWN

未知错误

「怎么判定消息已经被消费?」

上一节我们讲到,broker会用一个map来保存每个queue的消费进度,「如果queue的offset大于被查询消息的offset则消息被消费,否则没有被消费」(NOT_CONSUME_YET)

我们在RocketMQ-Dashboard上其实就能看到每个队列broker端的offset(代理者位点)以及消息消费的offset(消费者位点),差值就是没有被消费的消息

当消息都被消费时,差值为0,如下图所示

「CONSUMED_BUT_FILTERED表示消息已经投递,但是已经被过滤掉了」。例如producer发的是topicA,tagA,但是consumer订阅的却是topicA,tagB

「CONSUMED_BUT_FILTERED(消息已经被投递但被过滤)是怎么发生的呢?」

这个就不得不提到RocketMQ中的一个概念,「消息消费要满足订阅关系一致性,即一个consumerGroup中的所有消费者订阅的topic和tag必须保持一致,不然就会造成消息丢失」

如下图场景,发送了4条消息,consumer1订阅了topica-taga,而consumer2订阅了topica-tab。consumer1消费q0中的数据,consumer2消费q1中的数据

投递到q0的msg-1和msg-3只有msg-1能被正常消费,而msg-3则是CONSUMED_BUT_FILTERED。因为msg-3被投递到q0,但是consumer1不消费tagb的消息导致消息被过滤,造成消息丢失

同理msg-2这条消息也会丢失

「注意,还有一个非常重要的点」

虽然消息消费失败了,但是消息的offset还会正常提交,即 「消息消费失败了,但是状态也会是CONSUMED」

「RocketMQ认为消息消费失败需要重试的场景有哪些?」

  1. 返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  2. 返回null
  3. 主动或被动抛出异常

「那么消费失败的消息去哪了呢?」

当消息消费失败,会被放到重试队列中,Topic名字为%RETRY% + consumerGroup

「Consumer没订阅这个topic啊,怎么才能消费到重试消息?」

其实在Consumer启动的时候,框架内部帮你订阅了这个topic,所以重试消息能被消费到

「另外消息不是一直重试,而是每隔1段时间进行重试」

第几次重试

与上次重试的间隔时间

第几次重试

与上次重试的间隔时间

1

10 秒

9

7 分钟

2

30 秒

10

8 分钟

3

1 分钟

11

9 分钟

4

2 分钟

12

10 分钟

5

3 分钟

13

20 分钟

6

4 分钟

14

30 分钟

7

5 分钟

15

1 小时

8

6 分钟

16

2 小时

当消息超过最大消费次数16次,会将消息投递到死信队列中,死信队列的topic名为%DLQ% + consumerGroup。

「因此当你发现消息状态为CONSUMED,但是消费失败时,去重试队列和死信队列中找就行了」

消息消费异常排查实战

这个问题发生的背景是这样的,就是我们有2个系统,中间通过mq来保证数据的一致性,结果有一天数据不一致了,那肯定是consumer消费消息有问题,或者producer发送消息有问题

先根据时间段找到了消息,确保了发送没有问题,接着看消息的状态为NOT_CONSUME_YET,说明consumer在线但是没有消息

「NOT_CONSUME_YET表明消息没有被消费」,但是消息发送都过了好长时间了,consumer不应该没消费啊,查日志consumer确实没有消费。

用RocketMQ-Dashboard查看一下代理者位点和消费者位点,0队列正常消费,其他队列没有被消费

「感觉这个负载均衡策略有点问题啊,怎么0队列这么多消息,别的队列都怎么没消息,问一波中间件的同学,是不是又改负载均衡策略了?」

确实改了!测试环境下,采用队列纬度区分多环境,0是基准环境,我们团队目前还没有用多环境,所以收发消息都会在队列0上,其他队列不会用到(「你可以简单认为测试环境发送和消费消息只会用到0队列」

「那么问题来了!」

首先消息的状态是NOT_CONSUME_YET,说明消息肯定被投递到0队列之外了,但是中间件的小伙伴却说消息不会被投递到0队列

要想验证我的想法首先需要证明没有被消费的消息确实被投递到0队列之外的队列了

中间走的弯路就不说了,直到我看了看RocketMQ-Dashboard的源码,「发现Dashboard其实返回了消息的很多信息,但是并没有在页面展示出来,直接看接口返回」

乖乖,发现了新世界,消息的所有属性都在这了,看到queuId为14,果然验证了我的想法。

再看bornHost居然是我们办公室的网段

「难道本地启动的负载均衡策略和测试环境的负载均衡策略不一样?」

本地debug一波代码,果然是本地的producer会往所有的队列发送消息,并且consumer也会消费所有队列的消息

「至此找出问题了!」

producer在本地启了一个服务,注册到测试环境的zk,测试环境的部分请求打到本地,往0队列之外的队列发了消息,但是测试环境的consumer只会消费0队列中的消息,导致消息迟迟没有被消费

文章分享自微信公众号:
Java识堂

本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!

作者:小识
原始发表时间:2022-03-29
如有侵权,请联系 cloudcommunity@tencent.com 删除。
登录 后参与评论
0 条评论

相关文章

  • RocketMQ 消息丢失场景分析及如何解决!

    既然在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题。在一些涉及到了金钱交易的场景下,消息丢失还是很致命的。那么在RocketMQ中存在哪几种消息丢失的...

    架构师修炼
  • RocketMQ消息丢失解决方案:事务消息

    上篇文章,王子通过一个小案例和小伙伴们一起分析了一下消息是如何丢失的,但没有提出具体的解决方案。

    HUC思梦
  • RocketMQ的消息是怎么丢失的

    通过之前文章的阅读,有关RocketMQ的底层原理相信小伙伴们已经有了一个比较清晰的认识。

    HUC思梦
  • 面试官:RocketMQ 如何保证消息不丢失,如何保证消息不被重复消费?

    点击上方“芋道源码”,选择“设为星标” 管她前浪,还是后浪? 能浪的浪,才是好浪! 每天 10:33 更新文章,每天掉亿点点头发... 源码精品专栏 原创 |...

    芋道源码
  • RocketMQ 一行代码造成大量消息丢失

    错误信息关键点:MQBrokerException:CODE:2 DESC:[TIMEOUT_CLEAN_QUEUE]broker busy,start fl...

    丁威
  • 面试官再问我如何保证 RocketMQ 不丢失消息,这回我笑了!

    最近看了 @JavaGuide 发布的一篇『面试官问我如何保证Kafka不丢失消息?我哭了!』,这篇文章承接这个主题,来聊聊如何保证 RocketMQ 不丢失消...

    andyxh
  • RocketMQ一次延迟消息故障排查【实战笔记】

    RocketMQ社区版本支持18个延迟级别,每个级别在设定的时间都被会消费者准确消费到。为此也专门测试过消费的间隔是不是准确,测试结果显示很准确。然而,如此准确...

    瓜农老梁
  • Kafka —— 如何保证消息不会丢失

    当我们通过 send(msg, callback) 是不是就意味着消息一定不丢失了呢?

    solve
  • RabbitMq如何确保消息不丢失

    上篇写了掌握Rabbitmq几个重要概念,从一条消息说起,这篇来总结关于消息丢失让人头痛的事情。网络故障、服务器重启、硬盘损坏等都会导致消息的丢失。消息从生产到...

    李明成
  • RocketMQ消息丢失解决方案:同步刷盘+手动提交

    之前我们一起了解了使用RocketMQ事务消息解决生产者发送消息时消息丢失的问题,但使用了事务消息后消息就一定不会丢失了吗,肯定是不能保证的。

    HUC思梦
  • rabbitmq如何确保消息不丢失 chengtian

    上篇写了掌握Rabbitmq几个重要概念,从一条消息说起,这篇来总结关于消息丢失让人头痛的事情。网络故障、服务器重启、硬盘损坏等都会导致消息的丢失。消息从生产到...

    跟着阿笨一起玩NET
  • Twitter Storm如何保证消息不丢失

    storm保证从spout发出的每个tuple都会被完全处理。这篇文章介绍storm是怎么做到这个保证的,以及我们使用者怎么做才能充分利用storm的可靠性特点...

    星哥玩云
  • 大数据开发:消息队列如何确保消息不丢失?

    消息队列在大数据技术生态当中,一直都是值得重视的存在,开源的消息队列产品,市面上也不少,基于不同的场景,需要去匹配不同的解决方案。围绕消息队列,今天的大数据开发...

    成都加米谷大数据
  • RocketMQ主从如何同步消息消费进度?

    如果消费者消费模式不同,也会有不同的保存方式,消费者端的消息消费进度保存到 OffsetStore 中,他有两个实现类:

    张乘辉
  • 硬核 | Kafka 如何解决消息不丢失?

    Kafka 消息框架,大家一定不陌生,很多人工作中都有接触。它的核心思路,通过一个高性能的MQ服务来连接生产和消费两个系统,达到系统间的解耦,有很强的扩展性。

    微观技术
  • 硬核 | Kafka 如何解决消息不丢失?

    Kafka 消息框架,大家一定不陌生,很多人工作中都有接触。它的核心思路,通过一个高性能的MQ服务来连接生产和消费两个系统,达到系统间的解耦,有很强的扩展性。

    捡田螺的小男孩
  • kafka是如何保证消息不丢失的

    今天和大家聊一下,kafka对于消息的可靠性保证。作为消息引擎组件,保证消息不丢失,是非常重要的。

    一条老狗
  • 回答面试官:如何保证消息不丢失

    对于这个技术点不知道大家掌握的如何了,消息队列现在应该是公司必备的技能之一了,无论是RabbitMQ还是rocketmq,或者支持大数量的kafka

    Java宝典

扫码关注腾讯云开发者

领取腾讯云代金券