前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >3分钟白话RocketMQ系列—— 如何保证消息不丢失

3分钟白话RocketMQ系列—— 如何保证消息不丢失

作者头像
阿丸笔记
发布2023-10-22 17:32:38
3690
发布2023-10-22 17:32:38
举报
文章被收录于专栏:阿丸笔记阿丸笔记
回顾上一篇核心概念,我们知道RocketMQ的消息模型分为 生产、存储(消息堆积)、消费 三大部分。

消息模型三大部分

因此,如何保证消息不丢失,也是从这三个环节来考虑。

关键字摘要

  • 生产、存储(消息堆积)、消费 三个环节保证消息不丢失
  • 生产环节:消息类型,消息确认机制、失败重试机制
  • 存储环节:同步/异步刷盘、同步/异步复制slave
  • 消费环节:消息确认机制(至少消费成功一次)、失败重试机制、死信队列机制

Q1: 如何保证「消息生产」不丢失?

先想想什么情况下,消息生产会丢失消息呢?

生产者将发送消息时,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失。

那怎么解决这个问题?

其实思路是比较直接的,就是 「消息确认机制」和「失败重试机制」

消息发送成功返回确认消息,那就能确保消息不丢失。如果发送失败了,mq-client就尝试自动重试,避免网络抖动导致发送丢失。

如果超过一定超时时间还是失败,那就抛出异常,由开发者自己在应用层面进行处理,手动重试发送 或者 记录失败消息后续补偿。

不过我们需要特别注意是,RocketMQ支持多种「消息类型」,但是并不是对所有「消息类型」 都会有「消息确认机制」和「失败重试机制」。

RocketMQ生产消息时,支持多种「消息类型」和「消息发送模式」。咱们白话为主,就不展开源码了,有兴趣同学可以参考org.apache.rocketmq.client.producer.MQProducer这个接口定义即可。

消息类型:

  • 普通消息:发送普通消息,异常时默认重试
  • 普通有序消息:发送普通有序消息,通过指定「消息筛选器selector」,动态决定发送哪个队列。异常默认不重试,可以用户自己重试,并发送到其他队列
  • 严格有序消息:发送严格有序消息,通过指定队列,保证严格有序,异常默认不重试

消息发送模式:

  • 同步:调用发送消息方法后,同步阻塞,直到返回SendResult。配置retryTimesWhenSendFailed重试次数。
  • 异步:调用发送消息方法后,立即返回,发送结果会通过开发者自己注册的回调函数SendCallback进行处理。配置retryTimesWhenSendAsyncFailed重试次数。
  • 单向发送:这种方法完全不关心发送后的返回结果。显然,它具有最大吞吐量,但也存在消息丢失的潜在风险。

消息类型 和 消息发送模式 是 N * M 的关系,所以聪明的你一定已经想到了,存在9种不同组合,RocketMQ也是定义了9种不同接口方法。

这9种方法里面,涉及到「单向发送」模式的3种方法,都是不可靠的,存在丢失消息的风险。其他发送消息的模式和消息类型,可以通过 消息确认、mq-client自动「失败重试机制」、业务自定义重试 等方式,确保消息发送不丢失。

注意,org.apache.rocketmq.client.producer.MQProducer还定义了「事务消息」的发送模式,是属于分布式事务范畴了,跟我们这里讨论的消息不丢失不太一样,就不展开讨论了。后面单独写一篇针对「事务消息」的分析。

Q2: 如何保证「消息存储」不丢失?

先想想什么情况下,消息存储会丢失呢?

场景1,消息保存到内存中,还没来得及刷盘到磁盘,机器宕机或者重启,导致内存中消息丢失。 场景2,为了提高可用性,Broker通常采用一主多从的部署方式,为了确保消息不丢失,消息需要被复制到从节点。当消息发送到master但是还没同步到slave broker时,master broker磁盘损坏,导致消息数据丢失。或者master宕机,consumer切换到slave消费数据,消息丢失。

针对场景1,默认情况下,消息在到达 Broker 端后会首先被保存在内存中,并立即向生产者返回确认响应。随后,Broker 会定期批量将一组消息异步刷入磁盘。这种方式减少了 I/O 操作次数,提高了性能。

然而,如果发生机器掉电、异常宕机等情况,未及时将消息刷入磁盘,就可能导致消息丢失的情况。

如果要确保 Broker 端不丢失消息并保证消息的可靠性,我们需要修改消息保存机制为同步刷盘方式,即只有当消息成功存储到磁盘后才返回响应。可以通过flushDiskType = SYNC_FLUSH 参数进行控制。

针对场景2,在默认方式下,当消息成功写入主节点时,就会返回确认响应给生产者,并异步将消息复制到从节点。然而,如果主节点突然宕机且无法恢复,尚未复制到从节点的消息将会丢失。

为了进一步提高消息的可靠性,我们可以采用同步复制方式。主节点将会同步等待从节点完成复制,然后才返回确认响应。这样可以确保消息的可靠性。可以通过brokerRole=SYNC_MASTER参数进行控制。

注意,同步刷盘 和 同步复制 虽然能够保证消息不丢失,但是会严重降低性能,生产实践中需要根据实际情况综合评估。

Q3: 如何保证「消息消费」不丢失?

先想想什么情况下,消息存储会丢失呢?

因为各种原因消费失败,但是还是提交了消费位点,这条消息从业务角度来说就“丢失”了。

那怎么解决这个问题?

跟消息生产一样,其实思路是比较直接的,就是 「消息确认机制」和「失败重试机制」

消费者从RocketMQ拉取消息后,需要返回"CONSUME_SUCCESS"来表示业务方已经正常完成消费。只有返回"CONSUME_SUCCESS"才算作消费完成。这就是消费时的「消息确认机制」。

如果返回"CONSUME_LATER",则会按照不同的消息延迟级别进行再次消费,延迟级别从秒到小时不等,最长延迟时间为2个小时后再次尝试消费。这就是消费时的「失败重试机制」。

重试消息会被存入名为 "%RETRY%+消费组名称" 的Topic中,原始主题Topic会存入属性中。然后会基于定时任务机制,在到期时将任务再次拉取出来。

另外,RocketMQ跟kafka不同的是,天然支持了 「死信队列机制」

如果在尝试消费的过程中达到了最大重试次数(通常为16次),仍然无法成功消费,则消息将被发送到死信队列,以确保消息存储的可靠性。后续业务可以根据死信队列,来做相关补偿措施。

关键字总结

  • 生产、存储(消息堆积)、消费 三个环节保证不丢失
  • 生产环节:消息类型,消息确认机制、失败重试机制
  • 存储环节:同步/异步刷盘、同步/异步复制slave
  • 消费环节:消息确认机制(至少消费成功一次)、失败重试机制、死信队列机制

3分钟到了吗?应该对RocketMQ如何生产消息有全面了解了吧。 如果还想了解更多,欢迎关注下一期内容。

往期热门笔记合集推荐:

原创:阿丸笔记,欢迎 分享,转载请保留出处。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-09-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 阿丸笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 关键字摘要
  • Q1: 如何保证「消息生产」不丢失?
  • Q2: 如何保证「消息存储」不丢失?
  • Q3: 如何保证「消息消费」不丢失?
  • 关键字总结
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档