前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何保证消息恰好被消费一次?

如何保证消息恰好被消费一次?

作者头像
JavaEdge
发布2022-11-30 15:58:19
3990
发布2022-11-30 15:58:19
举报
文章被收录于专栏:JavaEdge

全是干货的技术号: 本文已收录在github仓库 Java-Interview-Tutorial,欢迎 star!

0 前言

对系统增加MQ:

  • 对峰值写流量做削峰填谷
  • 对次要业务逻辑做异步
  • 对不同系统模块做解耦

因为业务逻辑从同步代码中移除,所以也要有相应队列处理程序处理消息、执行业务逻辑。随着业务逻辑复杂,会引入更多外部系统和服务,就会越来越多使用MQ,与外部系统解耦合以及提升系统性能。

如系统要加红包功能:用户在购买一定数量商品后,系统给用户发一个现金红包激励用户消费。由于发放红包这种次要业务过程不应在购买商品的主业务流程,所以考虑MQ异步。 但引入 MQ 就必然遇到如下问题:

  • 若消息在投递过程丢失 用户就会因没有得到红包而投诉到你这边
  • 消息在投递过程出现重复 就会因发送两个红包而导致公司资产损失

1 消息为何会丢失?

消息从被写入到MQ,到被消费者消费完成,该链路上的如下场景可能丢失消息:

  • 消息从生产者(后文简称为Pro)写入到MQ的过程
  • 消息在MQ中的存储场景
  • 消息被消费者(后文简称为Con)消费的过程

1.1 在消息生产的过程

消息的Pro一般是业务服务器,MQ独立部署在单独服务器。二者间网络虽是内网,但也存在抖动可能,一旦网络抖动,消息就可能因网络错误而丢失。

1.1.1 解决方案

推荐消息重传:当你发现发送超时后,重发一次消息,但也不能无限重发。一般若不是MQ故障或到MQ的网络断开了,重试2~3次即可。

但这种方案可能造成【消息重复】,从而在消费时重复消费同样的消息。 比方说消息生产时,由于MQ处理慢或网络抖动,导致虽最终写入MQ成功,但对于Pro却是超时的,于是Pro重传这条消息,导致重复消息,你收到了两个现金红包!

1.2 在MQ中

消息在Kafka是存在本地磁盘,为减少消息存储时对磁盘的随机I/O,一般会将消息先写到os的Page Cache,然后择机机刷盘。

如Kafka可配置异步刷盘时机:

  • 当达到某一时间间隔
  • 或累积一定消息数量

假如你经营一个图书馆,读者每还一本书你都要去把图书归位,不仅工作量大且效率低下,但若你能选择每隔3h或图书达到一定数量,再把图书归位,这就能把同一类型的书一起归位,节省查找图书位置的时间,提高人效。

不过若发生掉电或异常重启,Page Cache还没有来得及刷盘的消息就会丢失。这咋办?你可能会:

  • 把刷盘的间隔设置很短
  • 或设置累积一条消息

就刷盘,但频繁刷盘很影响性能,且宕机或掉电几率其实也不高,故不推荐!

若你的系统对消息丢失容忍度很低,可考虑集群部署Kafka,通过部署多个副本备份数据,保证尽量不丢消息。

ISR

Kafka集群中有个Leader,负责消息的写入和消费,可有多个Follower负责数据备份。

Follower中有个特殊集合 — ISR(in-sync replicas),当Leader故障,新选举出来的Leader会从ISR中选择。默认Leader的数据会异步复制给Follower,这样在Leader掉电或宕机时,Kafka会从Follower中消费消息,减少消息丢失的可能。

但因消息默认是异步从Leader复制到Follower,所以一旦Leader宕机,那些还没来得及复制到Follower的消息还是会丢失。为解决该问题,Kafka为生产者提供“acks”:

acks

当该选项被设置为“all”,Pro发的每条消息,除了发给Leader,还会发给所有ISR,且必须得到Leader和所有ISR的确认后,才被认为发送成功。这样,只有Leader和所有ISR都挂了消息才会丢失。

当设置“acks=all”,需同步执行1、3、4三个步骤,对消息生产的性能有很大影响,实际应用中需仔细权衡。

最佳实践
  • 若你就是要确保消息一条都不能丢,就你让开启MQ的同步刷盘,而应该用集群方案,可配置当所有ISR Follower都接收到消息,才返回成功
  • 若对丢消息有一定容忍度,则建议不部署集群,即使集群部署,也推荐配置只发送给一个Follower即可返回成功
  • 业务系统一般对消息丢失有一定容忍度,如红包系统,若红包消息丢了,只要后续给没发送红包的用户补偿发送即可!

1.3 在消费过程

一个Con消费消息的进度是记录在MQ集群中的,消费过程分为如下步骤:

  • 接收消息
  • 处理消息
  • 更新消费进度

接收消息,处理消息的过程都可能异常,如:

  • 接收消息时网络抖动,导致消息并未被正确接收
  • 处理消息时可能发生一些业务异常,导致处理流程未执行完成,这时若更新消费进度,这条失败的消息就永远不会被处理了,就算丢失了

所以,务必等到消息接收、处理完成后,才能更新消费进度,但这也会造成消息重复,比如某条消息在处理后,Con恰好宕机,就因未更新消费进度,所以当该Con重启后,还会重复消费这条消息。

2 保证消息只被消费一次

经过上面分析发现,为避免消息丢失,我们需要付出代价:

  • 性能损耗
  • 可能造成消息重复消费

性能损耗还能接受,因为一般业务系统只有在写请求时,才有发送MQ的操作,而一般系统的写请求的量级并不高。但消息一旦被重复消费,就会造成业务逻辑处理错误,如何避免消息重复消费问题呢?

完全避免消息重复发生真的很难,因为网络抖动、机器宕机和处理异常都难以避免,业界也并无成熟方法,只能将要求放宽,只要保证即使消费到了重复消息,从消费的最终结果来看和只消费一次是等同即可,即保证在消息的生产和消费的过程“幂等”。

2.1 幂等

多次执行同一个操作和执行一次操作,最终得到的结果是相同的。

若消费一条消息,要将库存-1,则若消费两条相同消息,库存-2,这就非幂等的。 而若:

  • 消费一条消息后,处理逻辑是将库存数置0
  • 或若当前库存数是10,则减1

这样消费多条消息时,所得结果相同,这就是幂等。

2.1.1 生产过程增加消息幂等

消息在生产、消费过程中都可能重复,所以要在生产、消费过程增加消息幂等性保证,这就能认为从“最终结果看”,消息实际上是只被消费一次

消息生产过程中,Kafka0.11和Pulsar都支持“producer idempotency”,即生产过程幂等性,这保证消息虽然可能在生产端产生重复,但最终在MQ存储时只会存一份。

实现原理

给每个Pro一个唯一ID,并为生产的每条消息赋予一个唯一ID,MQ服务端会存储:

生产者ID=》最后一条消息ID的映射。 当某Pro产生新消息,Broker比对消息ID是否与存储的最后一条ID一致:

  • 若一致,就认为是重复消息,Broker自动丢弃
2.1.2 消费过程增加消息幂等

消费端幂等性保证稍微复杂,可从通用层和业务*两个层面考虑:

通用层面

消息被生产时,使用发号器给其生成一个全局唯一消息ID。消息被处理后,将该ID存储在DB,在处理下一条消息前,先从DB查询该全局ID是否被消费:

  • 若被消费过,就放弃消费

生产端幂等保证 && 消费端通用层面的幂等保证,都是为每个消息生成唯一ID,然后在使用该消息时,先判断ID是否已存在,若存在,则认为消息已被使用。 这其实是一种标准的幂等实现方案:

代码语言:javascript
复制
// 判断ID是否存在
boolean isIDExisted = selectByID(ID);

if(isIDExisted) {
  // 存在则直接返回
  return;
} else {
  // 不存在,则处理消息
  process(message);
  // 存储ID
  saveID(ID);
}

但若消息在处理后,还没有来得及写DB,Con宕机了,重启后发现DB并无这条消息,还是会重复执行两次消费逻辑,这就要引入事务,保证消息处理和写入DB必须同时成功或失败,但这样消息处理成本更高,所以若对消息重复无苛刻要求,可直接使用这种通用方案,而不考虑引入事务。

业务层面

有很多处理方式,有种是使用乐观锁,如你的消息处理程序要给一个人的账户加钱: 给每个人的账户数据加个版本号:

  • Pro生产消息时,先查询该账户版本号,将版本号连同消息一起发给Broker
  • Con拿到消息和版本号后,在执行更新账户金额SQL时,带上版本号:
代码语言:javascript
复制
update user 
set amount = amount + 20,
	version=version+1
where userId=1
and version=1;

更新数据时,给数据加乐观锁,这样在消费第一条消息时,version值为1,SQL可执行成功且同时把version值改为2。 在执行第二条相同消息时,由于version值不再是1,所以该SQL不会再成功执行,这就实现了消息幂等。

3 总结

消息丢失可通过如下方案解决:

  • 生产端重试
  • 消息队列配置集群模式
  • 消费端合理处理消费进度

为解决消息丢失问题,通常会造成:

  • 性能损失
  • 消息重复

通过保证消息处理的幂等性可以解决消息的重复问题。

并非说消息丢失一定不能接受,在允许消息丢失情况下,MQ性能更好,方案实现复杂度也最低。像是日志处理等场景,日志意义在于排查系统问题,而系统问题几率不高,偶发丢几条日志也能接受。

方案设计都看业务要求,你不能把所有MQ都配置成防止消息丢失方式,也不能要求所有业务处理逻辑都要支持幂等性,这会给开发和运维带来额外负担。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-09-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0 前言
  • 1 消息为何会丢失?
    • 1.1 在消息生产的过程
      • 1.1.1 解决方案
    • 1.2 在MQ中
      • ISR
      • acks
      • 最佳实践
  • 1.3 在消费过程
  • 2 保证消息只被消费一次
    • 2.1 幂等
      • 2.1.1 生产过程增加消息幂等
      • 2.1.2 消费过程增加消息幂等
  • 3 总结
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档