这篇文章太细太长了。先说结论。
在sarama进行producer.SendMessage
重试的时候,会重新创建brokerProducer
,这会重新dial。
在dial的时候,会硬编码重试3次(总共4次)。如果远端不可用,等待返回错误的时间将是: conf.Net.WriteTimeout + 4* conf.Net.DialTimeout
最近接到一个需求,大意是,使用kafka producer尝试写入目标kafka,如果3秒后还未响应(很可能目标kafka已经挂了),则退出流程告警,不再处理。
在sarama的配置中,很自然的想起了带Timeout和Retry的配置。
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 0 // 重新发送的次数
config.Producer.Timeout = time.Millisecond * 10 // 等待 WaitForAck的时间
config.Producer.Return.Successes = true // syncproducer必须设置
config.Net.WriteTimeout = time.Second * 3
...
kafkaProducer, err := sarama.NewSyncProducer(addresses, config)
在本地用docker搭建一个kafka。 参考:https://juejin.im/entry/6844903829624848398
起动测试程序写消息,写一会后,使用iptables把9092端口封了。对,就是这么暴力:
iptables -A OUTPUT -p tcp --dport 9092 -j DROP
# 恢复 iptables -D OUTPUT -p tcp --dport 9092 -j DROP
在封锁端口之后,程序3秒后退出。符合预期。
本来需求做完,就应该开开心心的去一张一弛劳逸结合(简称摸鱼),但是,本人向来很有探索的精神。
我想,只试一次,就把对方的kafka 踢了,人生真是太残酷了,恻隐之心下,于是将Producer.Retry.Max
配置改成了1。
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 1 // 重新发送的次数,现在改成1
config.Producer.Timeout = time.Millisecond * 10 // 等待 WaitForAck的时间
config.Producer.Return.Successes = true // syncproducer必须设置
config.Net.WriteTimeout = time.Second * 3
启动测试。先正常写入消息(这里很重要),然后封锁端口。 本来预计6秒左右,程序就应该退出了。此时诡异的现象发生,程序直接卡在那里。直到将近两分多钟后。才开始报错。
事出反常必有妖,马上谷歌一下。不得不吐槽的是,现在中文的技术文档真的是鱼龙混杂,天下文章一大抄。没有找到啥有价值的资料。最后只能自己动手看源码了。
这里使用goland的分析工具。 首先右键寻找Retry.Max 的引用。一开始很奇怪,只在async_producer.go找到了这个字段的引用。
点进去一看,原来SyncProducer就是AsyncProducer的包装而已。所以只需要看AsyncProducer的实现了。
这里先提一句。AsyncProducer有一个Input channel,所有想发送的数据,往这个chan里扔就行了。通过查找Input的引用,很快找到了dispatcher
这个函数。在这里,处理整个producer.Input()的消息。
通过罗永浩式的地狱般的流程,跟踪msg的路程。山路十八弯找到了partitionProducer.dispatch
。这里先敲黑板,记住这个地方。
最终有如下代码:
pp.brokerProducer.input <- msg
这条消息最终到了brokerProducer中。
那这个brokerProducer是什么呢?跳转到代码newBrokerProducer
中。发现这两行。这里有看头。
request := set.buildRequest()
response, err := broker.Produce(request) // 天啊。绕到这终于发送了。
responses <- &brokerProducerResponse{ // 如果失败,则把消息和错误丢到responses里
set: set,
err: err,
res: response,
}
在Produce
,终于要发送数据了。先看看发送,Produce
一路点开,进入到write
函数中。
func (b *Broker) write(buf []byte) (n int, err error) {
if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
return 0, err
}
return b.conn.Write(buf)
}
这里,终于看到了第一个配置conf.Net.WriteTimeout
。我感觉我马上离真相很近了。
这里的b.conn,通过反查得知:
dialer := conf.getDialer()
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
conn是一个tcp连接。dialer的代码在这,从这里开始,用的就是go的标准库:
return &net.Dialer{
Timeout: c.Net.DialTimeout,
KeepAlive: c.Net.KeepAlive,
LocalAddr: c.Net.LocalAddr,
}
发送的代码就先看到这里,这里说下错误处理,也就是retry的逻辑。
当write如果返回了错误,则写到responses
中。反查responses
, 一路点开,在brokerProducer.run
里面,发现了response的处理逻辑。
最终调用asyncProducer.retryMessage
:
它会判定是否重试,如果重试的话,就会把msg放到retries这个channel中,继续查找,找到了retryHandler
,在这个函数中,消息又被扔回了原来的 AsyncProducer.input
:
然后,上面的地狱般的流程,又会重新被执行一次。
然而,回到之前的问题。为什么在retry的时候,会卡死。似乎还是没有解答。当看代码没有思路的时候,加点打印,就成了最后的法宝。
通过我聪明的大脑袋一思考,发现,绝对是之前看漏了什么东西。我决定把sarama的代码clone下来,加点日志。看看它的执行流程。
git clone https://github.com/Shopify/sarama.git
然后,在go.mod中,把引用的module解析到本地的代码。 go.mod
...
require(
....
)
replace github.com/Shopify/sarama => /home/honoryin/workspace/personal/sarama
大功告成。
出于直觉,我觉得应该先在getDialer
中加点日志,很可能和 config.Net.DialTimeout 这个配置有关。
打开getDialer
函数,打一下调用栈。
func (c *Config) getDialer() proxy.Dialer {
fmt.Println("getDialer run")
stack := make([]byte, 1024*8)
var stackStr = string(stack[:runtime.Stack(stack, false)])
fmt.Println(stackStr)
.........
}
令人惊喜的事情发生了。 令人惊喜的事情发生了。 令人惊喜的事情发生了。
在发生失败后,进入重试流程时。getDialer
函数重复的被执行。
getDialer调用栈:
而此时,因为config.Net.DialTimeout
使用的是默认的配置,所以,在重试的时候把卡在这。
事情到了这里,不妨继续深挖下,为什么会重试getDialer。因为,我是在已经建立连接之后,中途将端口封锁的。它的重试机制在哪里呢。
定位到调用栈的最后一行broker.go:161
,发现调用了匿名函数,所以如法炮制,找到这个匿名函数的调用处Open
,再打下调用栈。
调用栈结果:
找到async_producer.go:517
,发现居然是上面的dispatch
函数。
顺着调用栈从下往上看,
因为此时是重试,msg.retries
被设为1,大于pp.highWatermark
(默认0)
newHighWatermark
函数中,brokerProducer被设置为了nil。
原来,这就是之前看漏的地方。
因为brokerProducer被设置为了nil,下面需要进行brokerProducer的重新配置。
在updateLeader
函数:
看到pp.breaker.Run
,这里的breaker
就是一个重试器。
反查它的定义:
breaker: breaker.New(3, 1, 10*time.Second),
这里,设定了3次重试,也就是说必须调4次getDialer
失败,才会收到错误。而getDialer
,又会因为被config.Net.DialTimeout
控制,默认时间是30秒。这会造成长期的等待。
窗外,夜色正美。 凉风拂面,虽然已经到了深夜,但一种奇妙的乐趣让人心旷神怡。站在大厦之上,仰望深沉的天空。
-- 公昔登临,想诗境满怀,酒杯在手 -- 我来依旧,见青山对面,明月当楼