某天,测试人员找到我,反馈说CI的kafka用例失败了,麻烦定位一下。
"麻烦先找下我们的小马甲——公共服务",这句话还没发出去,对方已经先把环境信息给发了过来。
想想应该是个小问题,索性直接先看了。
然后习惯性的登录到环境,先看下进程在不在、端口有没有监听、能不能生产消费后,发现一切都正常后,服务本身好像没什么毛病。这才问了下测试的兄弟,是什么用例失败,具体表现是怎样的?
测试:“喏,所有请求都正常,唯独这个请求一直超时”
DescribeLogDirsRequest ret = admin.describeLogDirs(brokerIds);
Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> tmp = ret.all().get();
我:"那这个请求是针对具体哪个topic会超时吗?"
测试:"不,所有的topic,都会超时!"
再次查看了服务端的日志,发现完全没有任何错误信息,连个告警的信息都没有。
我:“你能再运行下这个用例吗?“
测试:“跑过了,还是超时”
我:“你确定你们执行用例的节点和kafka的网络是通的吗,不会是网络不通吧?“
测试:"不可能,所有用例都是在一个节点上执行的,topic的其他操作也都没问题,就这个超时!"
再次排除了可能有影响的因素后,发现问题仍旧存在,好像不得不分析下源码了,可简单看了下源码后,客户端就是发送一个请求,而服务端又完全没有任何错误信息。
而恰好在搜索请求DescribeLogDirs关键字时发现,有对应的(命令)封装类(LogDirsCommand),可以通过kafka-run-class.sh来调用。
于是直接调用脚本进行了测试:
[root@kafka-0 bin]# sh kafka-run-class.sh kafka.admin.LogDirsCommand --bootstrap-server 10.38.221.239:9092 --broker-list 0 --describe --topic-list table_test1
Querying brokers for log directories information
Received log directory information from brokers 0
{"version":1,"brokers":[{"broker":0,"logDirs":[{"logDir":"/opt/kafka","error":null,"partitions":[{"partition":"table_test1-1","size":80936040,"offsetLag":0,"isFuture":false}]}]}]}
唉,神奇了,调用没有报错,也正确返回了结果,为啥CI用例就失败了呢?
于是,进一步分析了下相关的参数:
--bootstrap-server: 指定kafka broker的地址(必需的参数)
--describe: 描述指定brokers的指定(topic分区)目录信息(必需的参数)
--broker-list:用于指定请求的kafka broker的ID列表(非必需的参数)
--topic-list: 指定的topic列表(必需的参数)
其中,需要注意的是"--broker-list"这个参数,如果不带该参数,则以元数据请求中的kafka集群信息为准,否则以指定的"--broker-list"为准。
那么,测试CI的那个问题难道是参数指定了不存在(或者已停止)的kafka节点?
带着疑问,再次敲了命令,这次在"--broker-list"中指定了一个实际不存在的ID。
[root@kafka-0 bin]# sh kafka-run-class.sh kafka.admin.LogDirsCommand --bootstrap-server 10.38.221.239:9092 --broker-list 0,9 --describe --topic-list table_test1
Querying brokers for log directories information
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
at kafka.admin.LogDirsCommand$.describe(LogDirsCommand.scala:51)
at kafka.admin.LogDirsCommand$.main(LogDirsCommand.scala:37)
at kafka.admin.LogDirsCommand.main(LogDirsCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
噢,这次的情况变成了超时,和CI失败的用例的现象是一致的。
至于为什么会超时,分析了下"KafkaAdminClient"的源码,主要逻辑为:对于请求中的每个BrokerID,都需要从元数据请求中找到对应的broker信息,然后分别向这些broker建立连接,并真正发送请求。否则一直在pending队列中,直到元数据请求信息能匹配到对应的信息或请求超时。
感觉问题基本清楚的同时,心里也有了一定的底气,再次询问了下测试兄弟,请求参数的值是什么?是不是填错了?经过测试兄弟的确认后,发现入参"broker-list"的值与实际部署的kafka节点数不一致,也就是说"broker-list"中有不存在的broker ID,最终导致了请求超时的问题。经过修改参数后,CI用例都成功通过了。
小结一下,本问题其实是一个很简单的问题,关键在于使用时需要清楚地知道对应参数的含义,否则就可能引起问题。