首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

RocketMQ消费进度管理浅析

幂等性的取与舍

分布式平台上幂等性相关语义的保证,是我们构造安全、可信赖系统的永恒追求。作为异步、解耦通常实现方案下的最优选,我时常思考Rocket MQ设计者经历怎样的断舍离?

众所周知消息队列关于消息消费这一概念的落地实现,大体上分为三种情形:

At most  once

At least once

Exactly  once

翻译一下就是:

至多消费一次

至少消费一次

保证消费一次

很显然如果至多消费一次,势必造成消息丢失;至少消费一次就对我们的业务系统提出更高的要求,保证消费一次看似美好时则需要MQ系统背负沉重代价。Rocket MQ丝毫不犹豫的选择At least once。将幂等的保证大胆的交给开发者,不仅仅体现作者对MQ性能与功能两者矛盾的无奈,同时也体现了对广大开发人员的信任。

消费现状概述

上述论调虽然客观真实但不免有些悲观主义的意味,按照上文的理解我们业务体统需要倚仗ta,但我们又要时刻防备ta,因为一个不小心可能就会出纰漏,这还真是一个让人又爱又怕的存在。

读到这里,笔者似乎把ta描绘成了一个顽皮的孩子,但其实有些言重了,因为以我阅读源码的理解,业务系统没有异常,MQ所在的物理运行环境又比较健康的情况下,其实比较难以出现多次重复消费。

RocketMQ的幂等往往是由业务系统的异常逻辑,或者网络,或者不确定的运行环境破坏的。绝大多数情形下确定无疑ta依然是一个Good Boy。

按照我们对消息系统的朴素理解,消息的消费过程满足以下几个规律:

虽然不会严格的按照投递顺序进行消费,但大体上保持先进先出这个趋势

消息应该被精确的记录当前消费状态

总有一个角色负责统计、持久化消费偏移量

带着经验主义我们看看作者都为平稳消费与进度管理做出了哪些努力。

注:Rocket MQ的顺序消费模型是可以严格保证顺序的。

OffsetStore

消息被消费后也就失去了在ProcessQueue中停留的资格,ProcessQueue会删除该消息,并返回当前的最小偏移量放置到消息进度表中。很容易想象,如果这个消费进度不加以持久化,那么每次启动都要重头消费,显然无法接受,可是如何持久化,又持久化到何处呢?

Rocket MQ支持两种订阅模式:

集群消费模式:默认的消费模式,所有消息只需要被同组任一消费者消费一次即可,大家共享订阅Topic下的消费偏移量。

广播消费模式:各个消费者的消费行为是完全独立的,订阅Topic下所有的消息都需要被该组下所有消费者消费。

针对两种消费模型的特性,容易发现二者并不好一概而论,理想的实现是划分为两个策略,一个集中到Broker管理,一个分散出去由消费者管理。OffsetStore接口负责相关事宜,源码应证了我们猜想。      先来看看OffsetStore接口定义:

较之源码,方法排列被我调换了顺序,需要着重关注的我放到了后面。

注:如果没有Rocket MQ源码阅读经历ProcessQueue显得有些突兀,你可以将ta理解为消息在Consumer端的载体、物理队列某一个截取片段。作者如此定义ta:Queue consumption snapshot

LocalFileOffsetStore

广播模式下消息进度保留在Consumer端,文件遵守约定放置在可配置的固定目录下,文件路径如下:

默认在用户路径下一层创建一个".rocketmq_offsets"文件夹,注意这里有一个细节,文件夹以"."开头,在Linux系统中属于隐藏文件,需要加-a参数才能被显示。为了便于理解,下图展示了一个文件夹路径和一个Offset持久化文件的路径。

广播模式下Consumer#start()之后会调用OffsetStore.load()来加载消费进度,其原理就是根据约定拼接处文件全路径之后读取相应文件,然后序列化为OffsetSerializeWrapper对象:

假设我们有个发送短信的服务订阅"SMS_prod"Topic,那么形成的Json如下所示:注意offsetTable属性也是一个Json,而且key是MessageQueue对象,valule是一个数字表示偏移量。

既然可以在指定文件load关键信息,自然就有相关机制负责写入。还记得上文提到的persistAll方法吗?

对offsets.json的相关操作都被封装在MixAll工具类中:

MixAll.file2String: 将文件读取出来

MixAll.string2File: 将序列化好的对象写入文件

RemoteBrokerOffsetStore

因为偏移量维护在Broker端,所以该实现的load方法仅仅是一个声明。构造方法不需要计算文件路径也尤为简单,二者的offsetTable属性是一致的。我们着重来看看集群消费模式下如何保存消息消费进度。

不用深入研究,我们应该能发现至少两处不同:

粒度不同:广播模式是直接一下子把整个offsetTable持久化,而集群模式细化到了entry级别。

调用方式不同:广播模式是直接JVM内部调用写入文件即可,而集群模式需要RPC调用参与。

这里有必要强调一下二者产生的offset.json文件也是有区别的,下文我会分析,同时也带大家了解该RPC过程。

追踪源码发现,其实每次Consumer进行RPC调用上报自己的消费进度,Broker接收之后并没有立即进行持久化,而是直接更新到内存中。

TOPIC_GROUP_SEPARATOR为定义的常量: "@",之前我们提到过二者json有些许区别,offsetTable的key变成了一个拼接出来的字符串,该字符串左侧是TopicName,右侧是ConsumeGroupName中间用@符号连接。方便理解,我把这个json也展示出来:

持久化

两种文件持久化机制没有什么大的区别定时任务触发,或者消费端正常关闭执行shotdown()之前手动触发。

广播模式定时任务定义在MQClientInstance中,MQClientInstance对象在被实例化之后调用start()时启动该定时任务。定时任务的时间间隔支持配置默认是5000ms,延时10000ms之后开始执行。

集群模式定时任务定义BrokerController中,BrokerController对象在被实例化之后会有一系列初始化动作,initialize()会启动该定时任务。定时任务的时间间隔支持配置默认是5000ms,延时10000ms之后开始执行。

重复消费

原理分析了那么久,我想要传达的观点就是正常使用的前提下重复消费的原因一定跟offset上报,持久化有关系。

集群消费过程中Consumer意外宕机,offset没有上报导致重复消费

集群消费过程中Broker意外宕机,offset没有将最新的偏移量持久化导致重复消费

广播消费过程Consumer意外宕机,offset没有持久化到本地文件导致重复消费

offset.json文件意外损坏或删除,进度丢失导致重复消费

offset.json文件被篡改,进度不准确导致重复消费

还有一种是因为开发者返回了错误的ACK标示,导致Rocket误判以为消费失败,触发重试逻辑导致的重复消费。

如果本文对您有用,求一个赞

最后一句只对迪丽热巴和吴彦祖可见

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20210928A09UV400?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券