前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka-go 读取kafka消息丢失数据的问题定位和解决

kafka-go 读取kafka消息丢失数据的问题定位和解决

原创
作者头像
谢盼
修改2021-04-06 17:47:35
7K1
修改2021-04-06 17:47:35
举报
文章被收录于专栏:视频AI

kafka-go简介

segmentio/kafka-go 是一款开源的golang kafka读写sdk,开源地址为:https://github.com/segmentio/kafka-go 。截止写文章时,这个开源代码库收获了3.3K的star,在很多公司内外部项目广泛使用。与 https://github.com/confluentinc/confluent-kafka-gohttps://github.com/Shopify/sarama 一起,作为最常用的三个golang kafka sdk。

本文介绍在使用kafka-go的时候遇到的一个读写kafka数据丢失问题和问题定位解决的过程。

背景

在实现一个数据分析平台的项目中,引入了kafka作为数据落地和中转的通道,抽象出来讲,就是使用kafka-go的writer将数据写入到kafka的指定topic,然后使用kafka-go的reader将数据从指定的topic读取出来返回给用户。

抽象使用场景
抽象使用场景

故障

在项目运行一段时间后,用户反馈从kafka读出的数据条数少于投递到kafka的数据,即存在数据丢失的问题。

定位过程

1.压测确定问题存在

在用户反馈后,通过日志分析发现,writer.WriteMessages提示写入成功,但是http_proxy没有读到,确认消费组等配置正确后,查看消费组的消费情况,一切正常,没有发现问题。所以首先压测尝试复现问题。压测结果让我很震惊,简单的发1024条,收到1013条,丢失了11条,所以问题确定存在并且可以复现,数据丢失比例还很高。

2.确认丢失发生的环节

在压测程序中将读写的数据打印出来,同时将reader读取到的kafka.Message结构中的partition和offset信息打印出来,通过awk处理压测程序的日志,发现offset不连续,使用kafka自带工具查看被跳过的offset信息: ./kafka-console-consumer.sh -bootstrap-server 10.10.0.7:9092 --topic topicName --partition 0 --offset 231131 --max-messages 1

发现可以读取到消息,至此可以确定,数据丢失发生在读取环节,而不是写入环节。

3.跟踪分析代码找到问题原因

http_proxy中,为防止http阻塞,使用context.WithTimeout作为参数传给kafka-go reader读取消息,在超时后立刻返回。ctx的超时时间设置为100ms。

代码语言:javascript
复制
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(m.ReadTimeoutMs)*time.Millisecond)
msg, err := consumer.KReader.ReadMessage(ctx)
cancel()
return msg, err

下面是reader.ReadMessage的实现。

代码语言:javascript
复制
func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
   m, err := r.FetchMessage(ctx)
   if err != nil {
      return Message{}, err
   }

   if r.useConsumerGroup() {
      if err := r.CommitMessages(ctx, m); err != nil {
         return Message{}, err
      }
   }

   return m, nil
}

应该可以发现,如果FetchMessage成功,但是CommitMessages假失败的话,数据将被丢失,kafka服务器得到的信息是消息已经被正常消费掉了。

CommitMessages需要与服务器通信,提交offset被消费的信息,与服务器通信的过程可能会失败,sdk内部会使用平方退让策略进行等待和重试,重试的时间最长为0+1*1*100 + 2*2*100 =500ms 。

跟踪CommitMessages代码,它的实现中会把请求写入到一个reader成员的队列中,然后通过另一个临时channel变量等待应答,在等待应答的时候,ctx可能超时提前返回,这时commit大概率还是会在三次重试之内成功的,并成功的应答写入到管道,但是调用方commitMessages已经因超时退出不再等待了。

到这里,问题已经清晰了,就是由于我们设置的ctx为100ms,导致发生FetchMessage成功但是CommitMessage在100ms后才成功。

修复方法

读到这里,修复的方法已经很清晰了。你可能觉得只需要把ctx的时间改成500ms就可以了。如果是500ms,你发现仍然会丢数据,直观的,你可能认为500ms仍然丢数据是因为你的500ms先于sdk内部的500ms计时,所以会有数据丢失。

然后你改成550ms,你压测发现,还会有万分之一左右的数据丢失。你再看看代码,发现FetchMessage也使用到了ctx,而且在它的内部实现中,也是通过select chan 和ctx.Done()的方式来实现超时控制的,它也会花时间。留给CommitMessages的时间是不准确的。

说到这里,彻底的修复方法应该已经清楚了,就是重写这个实现,搞两个ctx传进来,分别控制时间,其中CommitMessages的ctx等待时间要略大于500ms,比如510ms。

其他

解决后github提交bug单,发现已经有人提了,看来坑过不少人。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • kafka-go简介
  • 背景
  • 故障
  • 定位过程
    • 1.压测确定问题存在
      • 2.确认丢失发生的环节
        • 3.跟踪分析代码找到问题原因
        • 修复方法
        • 其他
        相关产品与服务
        Elasticsearch Service
        腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档