用MQ时,要注意消息数据:
若这是用MQ传递非常核心的消息,如计费系统,就是很重的业务,操作很耗时,设计上经常将计费做成异步化,就是用MQ。
为确保MQ传递过程中不会弄丢计费消息。广告主投放个广告,说好用户点击一次扣1块。结果要是用户动不动点击了一次,扣费时搞的消息丢了,公司就会不断少几块。
MQ丢数据,一般分两种:
生产者将数据发送到MQ时,因为网络等问题,数据在半路丢了。
但MQ事务机制一搞,因为太耗性能,吞吐量就会降低。所以一般若你要确保写RabbitMQ的消息别丢,可开启confirm模式。
在Pro开启confirm模式后,你每次写的消息都会分配一个唯一id,然后若写入RabbitMQ,RabbitMQ会给你回传一个ack消息,告诉你说这个消息ok了。
若RabbitMQ未能处理该消息,就会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。可结合该机制,自己在内存里维护每个消息id的状态,若超过一定时间还没接收到该消息的回调,你就能重发。
所以一般在生产者这块避免数据丢失,都用confirm机制。
RabbitMQ自己丢了数据,这必须开启RabbitMQ持久化:消息写入后会持久化到磁盘,哪怕是RabbitMQ自己挂了,恢复后会自动读取之前存储的数据。
罕见的是,RabbitMQ还没持久化,自己就挂了,可能导致少量数据会丢失的。
必须同时设置这两个持久化,RabbitMQ哪怕是挂了,再次重启后,也会从磁盘上重启恢复queue,恢复该queue里的数据。
而且持久化可跟Pro的confirm机制配合,只有消息被持久化到磁盘后,才会通知Pro ack,所以哪怕是在持久化到磁盘前,rabbitmq挂了,数据丢了,生产者收不到ack,也可以自己重发。
你消费到了数据之后,消费者会自动通知RabbitMQ说ok ,我已经消费完这条消息了。
然而可能刚消费到消息,还没处理,Con进程挂了,重启后,RabbitMQ认为你都消费了,这数据就丢了。
用RabbitMQ提供的ack机制,关闭RabbitMQ自动ack,可通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack。
这样若你还没处理完,就不会ack,RabbitMQ就认为你还没处理完,这时RabbitMQ会把这个消费分配给别的consumer处理,不会丢消息。
唯一可能导致Con丢数据case:消费到了该消息,然后Con自动提交了offset,让kafka以为你已消费完该消息,然而其实你刚准备处理这消息,你还没处理完,你就挂了,这条消息就丢了。 因此,得关闭自动提交offset,在处理完后Con手动提交offset,即可保证数据不会丢。
但此时确实会重复消费,如你刚处理完,还没提交offset,结果自己挂了,此时肯定会重复消费一次,自己设计保证幂等即可。
kafka某broker宕机,然后重新选举partiton的leader时。若此时其他follower刚好还有些数据没同步,结果此时leader挂了,然后选举某个follower成leader后,就丢了一些数据。
一般要求起码设置如下参数:
配置后,至少在kafka Broker端能保证在leader所在broker发生故障,进行leader切换时,数据不会丢失。
如按上述思路设置ack=all,一定不会丢,要求是,你的leader接收到消息,所有follower都同步到了消息之后,才认为本次写成功了。 如果没满足这条件,生产者会自动不断重试,重试无限次。
RocketMQ 导致数据丢失的原因与前面的 RabbitMQ 和 Kafka 都很类似。生产者就是因为网络抖动等原因消息投递失败,或者 RocketMQ 自身的 Master 节点故障,主备切换故障之类的,消费者则有可能是异步处理导致还未处理成功就给 RocketMQ 提交了 offset 标识消息已处理了。
在 RocketMQ 中,事务消息可以保证消息零丢失。RocketMQ 的事务消息流程大致如下图所示:
在上面的事务消息流程中,基于这三个业务流程:发送 half 消息 -> 处理其他业务 -> commit/rollback。我们来讨论下面的几种情况:
万一生产者发送 half 消息失败,怎么办?
可以做重试或记录消息到如文件、数据库等地方,直接给用户返回失败,本次请求失败。
本文分别从生产者、MQ 自身、消费者介绍了导致消息丢失的原因,消息丢失问题是一个比较常见但又必须解决的问题。
不同的 MQ 如何解决消息丢失问题的。消费端导致的消息丢失都是由于数据还未处理成功确提前通知 MQ 消息已经处理成功了,禁止自动提交或异步操作即可,处理起来比较简单;生产者和 MQ 自身导致的消息丢失则比较难处理,RabbitMQ 使用了 Confirm 模式避免消息丢失;Kafka 则配置所有 follower 同步成功才给生产者响应推送消息成功;RocketMQ 则使用事务消息来保证消息的零丢失,针对不同的异常情况还提供了补偿机制进行处理。