首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >go sarama拾遗:有趣的超时

go sarama拾遗:有趣的超时

作者头像
超级大猪
发布2020-11-19 10:46:35
3.7K1
发布2020-11-19 10:46:35
举报

先说结论

这篇文章太细太长了。先说结论。

在sarama进行producer.SendMessage重试的时候,会重新创建brokerProducer,这会重新dial。

在dial的时候,会硬编码重试3次(总共4次)。如果远端不可用,等待返回错误的时间将是: conf.Net.WriteTimeout + 4* conf.Net.DialTimeout

从需求出发

最近接到一个需求,大意是,使用kafka producer尝试写入目标kafka,如果3秒后还未响应(很可能目标kafka已经挂了),则退出流程告警,不再处理。

第一版配置

在sarama的配置中,很自然的想起了带Timeout和Retry的配置。

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 0                    // 重新发送的次数
config.Producer.Timeout = time.Millisecond * 10  // 等待 WaitForAck的时间
config.Producer.Return.Successes = true // syncproducer必须设置

config.Net.WriteTimeout = time.Second * 3

...
kafkaProducer, err := sarama.NewSyncProducer(addresses, config)

在本地用docker搭建一个kafka。 参考:https://juejin.im/entry/6844903829624848398

起动测试程序写消息,写一会后,使用iptables把9092端口封了。对,就是这么暴力:

iptables -A OUTPUT -p tcp --dport 9092 -j DROP
# 恢复 iptables -D OUTPUT -p tcp --dport 9092 -j DROP

在封锁端口之后,程序3秒后退出。符合预期。

Producer 和 Retry 的心路历程

本来需求做完,就应该开开心心的去一张一弛劳逸结合(简称摸鱼),但是,本人向来很有探索的精神。 我想,只试一次,就把对方的kafka 踢了,人生真是太残酷了,恻隐之心下,于是将Producer.Retry.Max配置改成了1。

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 1                    // 重新发送的次数,现在改成1
config.Producer.Timeout = time.Millisecond * 10  // 等待 WaitForAck的时间
config.Producer.Return.Successes = true // syncproducer必须设置

config.Net.WriteTimeout = time.Second * 3

启动测试。先正常写入消息(这里很重要),然后封锁端口。 本来预计6秒左右,程序就应该退出了。此时诡异的现象发生,程序直接卡在那里。直到将近两分多钟后。才开始报错。

消息是怎么发送的

事出反常必有妖,马上谷歌一下。不得不吐槽的是,现在中文的技术文档真的是鱼龙混杂,天下文章一大抄。没有找到啥有价值的资料。最后只能自己动手看源码了。

这里使用goland的分析工具。 首先右键寻找Retry.Max 的引用。一开始很奇怪,只在async_producer.go找到了这个字段的引用。

点进去一看,原来SyncProducer就是AsyncProducer的包装而已。所以只需要看AsyncProducer的实现了。

这里先提一句。AsyncProducer有一个Input channel,所有想发送的数据,往这个chan里扔就行了。通过查找Input的引用,很快找到了dispatcher这个函数。在这里,处理整个producer.Input()的消息。

通过罗永浩式的地狱般的流程,跟踪msg的路程。山路十八弯找到了partitionProducer.dispatch这里先敲黑板,记住这个地方

最终有如下代码:

pp.brokerProducer.input <- msg

这条消息最终到了brokerProducer中。

那这个brokerProducer是什么呢?跳转到代码newBrokerProducer中。发现这两行。这里有看头。

            request := set.buildRequest()
            response, err := broker.Produce(request)  // 天啊。绕到这终于发送了。
            responses <- &brokerProducerResponse{  // 如果失败,则把消息和错误丢到responses里
                set: set,
                err: err,
                res: response,
            }

Produce,终于要发送数据了。先看看发送,Produce一路点开,进入到write函数中。

func (b *Broker) write(buf []byte) (n int, err error) {
    if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
        return 0, err
    }
    return b.conn.Write(buf)
}

这里,终于看到了第一个配置conf.Net.WriteTimeout。我感觉我马上离真相很近了。

这里的b.conn,通过反查得知:

        dialer := conf.getDialer()
        b.conn, b.connErr = dialer.Dial("tcp", b.addr)

conn是一个tcp连接。dialer的代码在这,从这里开始,用的就是go的标准库:

        return &net.Dialer{
            Timeout:   c.Net.DialTimeout,
            KeepAlive: c.Net.KeepAlive,
            LocalAddr: c.Net.LocalAddr,
        }

看看重试逻辑

发送的代码就先看到这里,这里说下错误处理,也就是retry的逻辑。

当write如果返回了错误,则写到responses中。反查responses, 一路点开,在brokerProducer.run里面,发现了response的处理逻辑。

最终调用asyncProducer.retryMessage

它会判定是否重试,如果重试的话,就会把msg放到retries这个channel中,继续查找,找到了retryHandler,在这个函数中,消息又被扔回了原来的 AsyncProducer.input

然后,上面的地狱般的流程,又会重新被执行一次。

然而,回到之前的问题。为什么在retry的时候,会卡死。似乎还是没有解答。当看代码没有思路的时候,加点打印,就成了最后的法宝。

再战,永不言败

通过我聪明的大脑袋一思考,发现,绝对是之前看漏了什么东西。我决定把sarama的代码clone下来,加点日志。看看它的执行流程。

git clone https://github.com/Shopify/sarama.git

然后,在go.mod中,把引用的module解析到本地的代码。 go.mod

...
require(
....
)
replace github.com/Shopify/sarama => /home/honoryin/workspace/personal/sarama

大功告成。

出于直觉,我觉得应该先在getDialer中加点日志,很可能和 config.Net.DialTimeout 这个配置有关。 打开getDialer函数,打一下调用栈。

func (c *Config) getDialer() proxy.Dialer {
    fmt.Println("getDialer run")
    stack := make([]byte, 1024*8)
    var stackStr = string(stack[:runtime.Stack(stack, false)])
    fmt.Println(stackStr)
    .........
}

令人惊喜的事情发生了。 令人惊喜的事情发生了。 令人惊喜的事情发生了。

在发生失败后,进入重试流程时。getDialer函数重复的被执行

getDialer调用栈:

而此时,因为config.Net.DialTimeout使用的是默认的配置,所以,在重试的时候把卡在这。

事情到了这里,不妨继续深挖下,为什么会重试getDialer。因为,我是在已经建立连接之后,中途将端口封锁的。它的重试机制在哪里呢。

定位到调用栈的最后一行broker.go:161,发现调用了匿名函数,所以如法炮制,找到这个匿名函数的调用处Open,再打下调用栈。

调用栈结果:

找到async_producer.go:517,发现居然是上面的dispatch函数。

顺着调用栈从下往上看,

因为此时是重试,msg.retries被设为1,大于pp.highWatermark(默认0) newHighWatermark函数中,brokerProducer被设置为了nil。

原来,这就是之前看漏的地方。

因为brokerProducer被设置为了nil,下面需要进行brokerProducer的重新配置。

updateLeader函数:

看到pp.breaker.Run,这里的breaker就是一个重试器

反查它的定义:

        breaker:    breaker.New(3, 1, 10*time.Second),

这里,设定了3次重试,也就是说必须调4次getDialer失败,才会收到错误。而getDialer,又会因为被config.Net.DialTimeout控制,默认时间是30秒。这会造成长期的等待。

窗外,夜色正美。 凉风拂面,虽然已经到了深夜,但一种奇妙的乐趣让人心旷神怡。站在大厦之上,仰望深沉的天空。

-- 公昔登临,想诗境满怀,酒杯在手 -- 我来依旧,见青山对面,明月当楼

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-11-18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 先说结论
  • 从需求出发
  • 第一版配置
  • Producer 和 Retry 的心路历程
  • 消息是怎么发送的
  • 看看重试逻辑
  • 再战,永不言败
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档