segmentio/kafka-go 是一款开源的golang kafka读写sdk,开源地址为:https://github.com/segmentio/kafka-go 。截止写文章时,这个开源代码库收获了3.3K的star,在很多公司内外部项目广泛使用。与 https://github.com/confluentinc/confluent-kafka-go 和 https://github.com/Shopify/sarama 一起,作为最常用的三个golang kafka sdk。
本文介绍在使用kafka-go的时候遇到的一个读写kafka数据丢失问题和问题定位解决的过程。
在实现一个数据分析平台的项目中,引入了kafka作为数据落地和中转的通道,抽象出来讲,就是使用kafka-go的writer将数据写入到kafka的指定topic,然后使用kafka-go的reader将数据从指定的topic读取出来返回给用户。
在项目运行一段时间后,用户反馈从kafka读出的数据条数少于投递到kafka的数据,即存在数据丢失的问题。
在用户反馈后,通过日志分析发现,writer.WriteMessages提示写入成功,但是http_proxy没有读到,确认消费组等配置正确后,查看消费组的消费情况,一切正常,没有发现问题。所以首先压测尝试复现问题。压测结果让我很震惊,简单的发1024条,收到1013条,丢失了11条,所以问题确定存在并且可以复现,数据丢失比例还很高。
在压测程序中将读写的数据打印出来,同时将reader读取到的kafka.Message结构中的partition和offset信息打印出来,通过awk处理压测程序的日志,发现offset不连续,使用kafka自带工具查看被跳过的offset信息: ./kafka-console-consumer.sh -bootstrap-server 10.10.0.7:9092 --topic topicName --partition 0 --offset 231131 --max-messages 1
发现可以读取到消息,至此可以确定,数据丢失发生在读取环节,而不是写入环节。
http_proxy中,为防止http阻塞,使用context.WithTimeout作为参数传给kafka-go reader读取消息,在超时后立刻返回。ctx的超时时间设置为100ms。
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(m.ReadTimeoutMs)*time.Millisecond)
msg, err := consumer.KReader.ReadMessage(ctx)
cancel()
return msg, err
下面是reader.ReadMessage的实现。
func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
m, err := r.FetchMessage(ctx)
if err != nil {
return Message{}, err
}
if r.useConsumerGroup() {
if err := r.CommitMessages(ctx, m); err != nil {
return Message{}, err
}
}
return m, nil
}
应该可以发现,如果FetchMessage成功,但是CommitMessages假失败的话,数据将被丢失,kafka服务器得到的信息是消息已经被正常消费掉了。
CommitMessages需要与服务器通信,提交offset被消费的信息,与服务器通信的过程可能会失败,sdk内部会使用平方退让策略进行等待和重试,重试的时间最长为0+1*1*100 + 2*2*100 =500ms 。
跟踪CommitMessages代码,它的实现中会把请求写入到一个reader成员的队列中,然后通过另一个临时channel变量等待应答,在等待应答的时候,ctx可能超时提前返回,这时commit大概率还是会在三次重试之内成功的,并成功的应答写入到管道,但是调用方commitMessages已经因超时退出不再等待了。
到这里,问题已经清晰了,就是由于我们设置的ctx为100ms,导致发生FetchMessage成功但是CommitMessage在100ms后才成功。
读到这里,修复的方法已经很清晰了。你可能觉得只需要把ctx的时间改成500ms就可以了。如果是500ms,你发现仍然会丢数据,直观的,你可能认为500ms仍然丢数据是因为你的500ms先于sdk内部的500ms计时,所以会有数据丢失。
然后你改成550ms,你压测发现,还会有万分之一左右的数据丢失。你再看看代码,发现FetchMessage也使用到了ctx,而且在它的内部实现中,也是通过select chan 和ctx.Done()的方式来实现超时控制的,它也会花时间。留给CommitMessages的时间是不准确的。
说到这里,彻底的修复方法应该已经清楚了,就是重写这个实现,搞两个ctx传进来,分别控制时间,其中CommitMessages的ctx等待时间要略大于500ms,比如510ms。
解决后github提交bug单,发现已经有人提了,看来坑过不少人。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。