事情原委:由于业务量暴涨,kafka硬盘不够保存7天的数据,所以希望升级一下硬盘,能保存7天的日志,之后确认某一天进行升级,升级完了之后发现两三天之前的数据也被重新消费提交到数据库。发现有此问题之后,然后只能修复数据了,苦逼的修复线上最近3天的超过400G的数据。
关于上面出现的问题,需要查证为什么三天前已经提交过offset的数据还被重新消费?和阿里云的技术支持对接之后,他们让我给一下Kafka消费客户端配置。
clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
他们给的解释是OffsetOldest会拉到最旧的消息进行消费,而我们业务代码中的配置是拉取最旧的消息,并没有做幂等处理,所以造成重复消费。但是其实我这两三天前的被提交的offset也被消费了,我就很不接受他说的了,后面那个阿里云的技术工程师给了一些可能出现的情况会造成这个问题,具体我就这里先不阐述了。
为什么我两三天的数据已经被提交过offset的也被重复消费了?这个我总结了大概几个原因
我们先来看下kafka关于
auto.offset.reset
的参数值:latest和earliest的详解。
latest和earliest区别
1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
下面我们来验证一下:
这个是生成zzh_test的topic,生成a1到a11,然后我通过设置
clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
和
clusterCfg.Consumer.Offsets.Initial = sarama.OffsetNewest
两个去消费,根据不同的情况打印一下结果,结果如下。
关于offset提交的位移配置我们清楚了,所以上面的问题肯定不是这个参数造成的。我们重新回到"为什么我两三天的数据已经被提交过offset的也被重复消费了"这个问题。这个其实主要是阿里云kafka的硬盘升级涉及到数据的迁移,kafka机器是一台一台升级然后重启,造成大量的rebalance,触发到了Sarama Go客户端的OutOfRange机制,然后消费位点重置。
吐槽一下:关于阿里云的文档,这样的注意事项相当于什么都没说。
总结: