前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ系列 | 容量削峰填谷后,发送的消息“少”了怎么办!!??

RocketMQ系列 | 容量削峰填谷后,发送的消息“少”了怎么办!!??

作者头像
烟雨平生
发布2023-10-06 20:37:55
1790
发布2023-10-06 20:37:55
举报
文章被收录于专栏:数字化之路数字化之路

业务背景

一个业务实体的属性出现变更,会刷新用户域、订单域、商品域等多个域冗余的数据。变更数据涉及到的数据量大时,会比较耗时、耗内存。

如果同时变更的数据较多时,就超过当前服务的容量,JVM会频繁FullGC,继而pod重启。数据变更的功能不可用了。

解决方案1:加机器。增加系统容量 解决方案2:数据源更新与刷新其它业务域的冗余数据解耦。使用MQ来异步刷新冗余的数据实现容量的削峰填谷。

最终使用了方案2,使用目前项目中使用的消息中间件RocketMQ。原因是这个场景并不高频,可能通过控制MQ消费线程数来减少对机器资源的消耗。此处设置为2

方案2上线运行一段时间后,出现一个现象:

变更的事件消息会偶发性的丢失

现象:

1、可以找到到发送成功的日志。

2、疑似丢失的消息,在用户域、商品域找到接收消息并消费成功的日志,但在订单域中没有找到接收消息的日志。

3、订单域一直在刷新冗余数据。未消费且过期的消息,会被Rocket服务端删除。

解决办法:

1、优化数据刷新的逻辑,减少对内存的消耗。 通过翻页获取数据的方式小步快走的方式小批量获取数据、刷新数据。

2、增加RocketMQ的消费线程数。从2调整为8。

事件消息会偶发性丢失的原因分析

过期清理机制引发消息丢失: 消息按照到达服务器的先后顺序被存储到队列中,理论上每个队列都支持无限存储。 但是在实际部署场景中,服务端节点的物理存储空间有限,消息无法做到永久存储。 RocketMQ 使用存储时长作为消息存储的依据。 在存储时长范围内的消息都会被保留,无论消息是否被消费; 超过时长限制的消息则会被清理掉。 JackieTang,公众号:的数字化之路RocketMQ系列 | 如何让消息“丢失”?

RocketMQ如何判定一个消息有没有过期呢?

要讲清楚这个问题,就不得不先聊明白消费进度管理。

消费进度原理

消息位点(Offset)

RocketMQ领域中消息是按到达服务端的先后顺序存储在指定主题[Topic]的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。 任意一个消息队列在逻辑上都是无限存储,即消息位点会从0到Long.MAX无限增加。通过主题、队列和位点就可以定位任意一条消息的位置,具体关系如下图所示:

RocketMQ领域中定义队列中 最早一条消息的位点为最小消息位点(MinOffset); 最新一条消息的位点为最大消息位点(MaxOffset)。

虽然消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限,RocketMQ会滚动删除队列中存储最早的消息。因此,消息的最小消费位点和最大消费位点会一直递增变化。

消费位点(ConsumerOffset)

RocketMQ领域模型为发布订阅模式,每个主题的队列都可以被多个消费者分组订阅。若某条消息被某个消费者消费后直接被删除,则其他订阅了该主题的消费者将无法消费该消息。

因此,RocketMQ通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,云消息队列 RocketMQ 版会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点。

当消费者客户端离线,又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。 如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点

那么,历史消息会保存多久呢?也就是如何判定一个消息在服务端有没有过期呢? 看情况。不同的RocketMQ服务器都会不同。以阿里的云消息队列RocketMQ版为例:

  • 5.0系列实例:
    • 最短24小时。
    • 最长720小时。
  • 4.0系列实例:
    • 标准版:存储时长为3天,超过时间将自动滚动删除。
    • 企业铂金版:存储时长为3天,若您购买实例的存储空间规格不足时,云消息队列 RocketMQ 版会按存储时间滚动删除最早的消息,此时消息的存储时长可能不足3天。

队列中消息位点MinOffset、MaxOffset和每个消费者分组的消费位点ConsumerOffset的关系如下:

  • ConsumerOffset≤MaxOffset:
    • 当消费速度和生产速度一致,且全部消息都处理完成时,最大消息位点和消费位点相同,即ConsumerOffset=MaxOffset。
    • 当消费速度较慢小于生产速度时,队列中会有部分消息未消费,此时消费位点小于最大消息位点,即ConsumerOffset<MaxOffset,两者之差就是该队列中堆积的消息量。
  • ConsumerOffset≥MinOffset:正常情况下有效的消费位点ConsumerOffset必然大于等于最小消息位点MinOffset。消费位点小于最小消息位点时是无效的,相当于消费者要消费的消息已经从队列中删除了,是无法消费到的,此时服务端会将消费位点强制纠正到合法的消息位点。

消费位点初始值

消费位点初始值指的是消费者分组[Group ID]首次启动消费者消费消息时,服务端保存的消费位点的初始值。

RocketMQ定义消费位点的初始值为消费者首次获取消息时,该时刻队列中的最大消息位点。相当于消费者将从队列中最新的消息开始消费

小结

结合消费进度管理和目前遇到的因为消费慢引发的消息丢失问题,我们来还原下消息丢失的原因:

事件消息发出后,由于订单域消费消息的速度低于生产,然后出现消息堆积。 订单服务上线新需求,老的RocketMQ消费客户端下线。 上线完成后,启动新的RocketMQ消费客户端。 新的RocketMQ消费者[Group ID]从RocketMQ Broker服务器拉取消息。 如果RocketMQ服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。 这些未消费且过期的消息,就会被删除。 从业务上看,这些消息是丢失了。

实际上,即使订单服务没有重新发布,也会出现消息丢失。 因为过期的消息已经从RocketMQ服务端自动滚动删除了

参考

消费进度管理:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/consumer-progress-management

心情点滴

中秋节好像像平时一样,准备家常便饭。 妈妈说:“大人过不过节都一样,小孩子过节要有一个过节的样。” 然后一块做了一顿丰富的饭,孩子吃的开心,感受到节日的不同。 同时自己也觉得一下子有了家的样子,有了家该有温度。一下子满足了对家的想象

有妈的日子,真好

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-10-01 20:18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 的数字化之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消费进度原理
    • 消息位点(Offset)
      • 消费位点(ConsumerOffset)
        • 消费位点初始值
        相关产品与服务
        对象存储
        对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档