前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >消息的可靠性传输,如何处理消息丢失问题?

消息的可靠性传输,如何处理消息丢失问题?

作者头像
JavaEdge
发布2022-11-30 15:03:31
1K0
发布2022-11-30 15:03:31
举报
文章被收录于专栏:JavaEdgeJavaEdge

用MQ时,要注意消息数据:

  • 不能多,牵涉重复消费处理和幂等性问题
  • 不能少,消息不能搞丢呀

若这是用MQ传递非常核心的消息,如计费系统,就是很重的业务,操作很耗时,设计上经常将计费做成异步化,就是用MQ。

为确保MQ传递过程中不会弄丢计费消息。广告主投放个广告,说好用户点击一次扣1块。结果要是用户动不动点击了一次,扣费时搞的消息丢了,公司就会不断少几块。

MQ丢数据,一般分两种:

  • MQ自己弄丢了
  • 消费时弄丢了

1.1 生产者丢数据

生产者将数据发送到MQ时,因为网络等问题,数据在半路丢了。

解决方案
事务功能
  • 生产者发数据前,开启事务(channel.txSelect),然后发送消息
  • 若消息未成功被MQ接收到,则Pro会收到异常报错,此时即可回滚事务(channel.txRollback),然后重试发送消息
  • 若收到消息,则可提交事务(channel.txCommit)

但MQ事务机制一搞,因为太耗性能,吞吐量就会降低。所以一般若你要确保写RabbitMQ的消息别丢,可开启confirm模式。

confirm模式

在Pro开启confirm模式后,你每次写的消息都会分配一个唯一id,然后若写入RabbitMQ,RabbitMQ会给你回传一个ack消息,告诉你说这个消息ok了。

若RabbitMQ未能处理该消息,就会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。可结合该机制,自己在内存里维护每个消息id的状态,若超过一定时间还没接收到该消息的回调,你就能重发。

事务机制 V.S cnofirm机制
  • 事务机制是同步的,你提交一个事务后,会阻塞在那儿
  • confirm机制是异步的,你发送个消息后,即可发送下个消息,然后那个消息RabbitMQ接收后,会异步回调你一个接口,通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失,都用confirm机制。

RabbitMQ丢数据

RabbitMQ自己丢了数据,这必须开启RabbitMQ持久化:消息写入后会持久化到磁盘,哪怕是RabbitMQ自己挂了,恢复后会自动读取之前存储的数据。

罕见的是,RabbitMQ还没持久化,自己就挂了,可能导致少量数据会丢失的。

设置持久化
  • 创建queue时,将其设置为持久化,保证RabbitMQ持久化queue的元数据,但不会持久化queue里的数据
  • 发送消息时,将消息的deliveryMode设为2:将消息设置为持久化的,此时RabbitMQ就会将消息持久化到磁盘

必须同时设置这两个持久化,RabbitMQ哪怕是挂了,再次重启后,也会从磁盘上重启恢复queue,恢复该queue里的数据。

而且持久化可跟Pro的confirm机制配合,只有消息被持久化到磁盘后,才会通知Pro ack,所以哪怕是在持久化到磁盘前,rabbitmq挂了,数据丢了,生产者收不到ack,也可以自己重发。

Con弄丢数据

你消费到了数据之后,消费者会自动通知RabbitMQ说ok ,我已经消费完这条消息了。

然而可能刚消费到消息,还没处理,Con进程挂了,重启后,RabbitMQ认为你都消费了,这数据就丢了。

解决方案

用RabbitMQ提供的ack机制,关闭RabbitMQ自动ack,可通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack。

这样若你还没处理完,就不会ack,RabbitMQ就认为你还没处理完,这时RabbitMQ会把这个消费分配给别的consumer处理,不会丢消息。

2 Kafka

消费端丢数据

唯一可能导致Con丢数据case:消费到了该消息,然后Con自动提交了offset,让kafka以为你已消费完该消息,然而其实你刚准备处理这消息,你还没处理完,你就挂了,这条消息就丢了。 因此,得关闭自动提交offset,在处理完后Con手动提交offset,即可保证数据不会丢。

但此时确实会重复消费,如你刚处理完,还没提交offset,结果自己挂了,此时肯定会重复消费一次,自己设计保证幂等即可。

Broker弄丢数据

kafka某broker宕机,然后重新选举partiton的leader时。若此时其他follower刚好还有些数据没同步,结果此时leader挂了,然后选举某个follower成leader后,就丢了一些数据。

一般要求起码设置如下参数:

  • 给topic设置replication.factor参数:必须大于1,要求每个partition必须有至少2个副本
  • 在kafka Broker设置min.insync.replicas参数:必须大于1,要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower
  • Pro端设置acks=all:要求每条数据,必须写入所有replica后,才能认为是写成功
  • Pro端设置retries=MAX(无限次重试):一旦写入失败,就无限重试,卡在这里

配置后,至少在kafka Broker端能保证在leader所在broker发生故障,进行leader切换时,数据不会丢失。

Pro丢数据

如按上述思路设置ack=all,一定不会丢,要求是,你的leader接收到消息,所有follower都同步到了消息之后,才认为本次写成功了。 如果没满足这条件,生产者会自动不断重试,重试无限次。

3 RocketMQ

RocketMQ 导致数据丢失的原因与前面的 RabbitMQ 和 Kafka 都很类似。生产者就是因为网络抖动等原因消息投递失败,或者 RocketMQ 自身的 Master 节点故障,主备切换故障之类的,消费者则有可能是异步处理导致还未处理成功就给 RocketMQ 提交了 offset 标识消息已处理了。

在 RocketMQ 中,事务消息可以保证消息零丢失。RocketMQ 的事务消息流程大致如下图所示:

在上面的事务消息流程中,基于这三个业务流程:发送 half 消息 -> 处理其他业务 -> commit/rollback。我们来讨论下面的几种情况:

万一生产者发送 half 消息失败,怎么办?

可以做重试或记录消息到如文件、数据库等地方,直接给用户返回失败,本次请求失败。

  • 万一生产者发送 half 消息成功,但是处理其他业务失败,又该怎么办呢? 生产者发送 rollback 请求回滚 RocketMQ 中该条消息,本次请求失败。
  • 万一生产者发送 half 消息成功,但是 RocketMQ 由于某些原因如网络超时等导致没有响应,怎么处理? 由于 half 消息已发送成功,此时 RocketMQ 中已经有该条消息了,RocketMQ 会有一个补偿机制,补偿机制会回调你开发好的一个接口,询问你这条消息是要 commit 还是 rollback。
  • 万一生产者发送 half 消息成功,但是请求 commit 或 rollback 的时候失败了呢? 这个问题与上面的问题一样,都是通过 RocketMQ 的补偿机制来处理。

4 总结

本文分别从生产者、MQ 自身、消费者介绍了导致消息丢失的原因,消息丢失问题是一个比较常见但又必须解决的问题。

不同的 MQ 如何解决消息丢失问题的。消费端导致的消息丢失都是由于数据还未处理成功确提前通知 MQ 消息已经处理成功了,禁止自动提交或异步操作即可,处理起来比较简单;生产者和 MQ 自身导致的消息丢失则比较难处理,RabbitMQ 使用了 Confirm 模式避免消息丢失;Kafka 则配置所有 follower 同步成功才给生产者响应推送消息成功;RocketMQ 则使用事务消息来保证消息的零丢失,针对不同的异常情况还提供了补偿机制进行处理。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-05-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.1 生产者丢数据
    • 解决方案
      • 事务机制 V.S cnofirm机制
        • RabbitMQ丢数据
        • Con弄丢数据
          • 解决方案
          • 2 Kafka
            • 消费端丢数据
              • Broker弄丢数据
                • Pro丢数据
                • 3 RocketMQ
                • 4 总结
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档