前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka中 DescribeLogDirs请求参数引起的一个问题

kafka中 DescribeLogDirs请求参数引起的一个问题

作者头像
陈猿解码
发布2023-02-28 15:11:49
5000
发布2023-02-28 15:11:49
举报
文章被收录于专栏:陈猿解码陈猿解码

某天,测试人员找到我,反馈说CI的kafka用例失败了,麻烦定位一下。

"麻烦先找下我们的小马甲——公共服务",这句话还没发出去,对方已经先把环境信息给发了过来。

想想应该是个小问题,索性直接先看了。

然后习惯性的登录到环境,先看下进程在不在、端口有没有监听、能不能生产消费后,发现一切都正常后,服务本身好像没什么毛病。这才问了下测试的兄弟,是什么用例失败,具体表现是怎样的?

测试:“喏,所有请求都正常,唯独这个请求一直超时”

代码语言:javascript
复制
DescribeLogDirsRequest ret = admin.describeLogDirs(brokerIds);
Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> tmp = ret.all().get();

我:"那这个请求是针对具体哪个topic会超时吗?"

测试:"不,所有的topic,都会超时!"

再次查看了服务端的日志,发现完全没有任何错误信息,连个告警的信息都没有。

我:“你能再运行下这个用例吗?“

测试:“跑过了,还是超时”

我:“你确定你们执行用例的节点和kafka的网络是通的吗,不会是网络不通吧?“

测试:"不可能,所有用例都是在一个节点上执行的,topic的其他操作也都没问题,就这个超时!"

再次排除了可能有影响的因素后,发现问题仍旧存在,好像不得不分析下源码了,可简单看了下源码后,客户端就是发送一个请求,而服务端又完全没有任何错误信息。

而恰好在搜索请求DescribeLogDirs关键字时发现,有对应的(命令)封装类(LogDirsCommand),可以通过kafka-run-class.sh来调用。

于是直接调用脚本进行了测试:

代码语言:javascript
复制
[root@kafka-0 bin]# sh kafka-run-class.sh kafka.admin.LogDirsCommand --bootstrap-server 10.38.221.239:9092 --broker-list 0 --describe --topic-list table_test1
Querying brokers for log directories information
Received log directory information from brokers 0
{"version":1,"brokers":[{"broker":0,"logDirs":[{"logDir":"/opt/kafka","error":null,"partitions":[{"partition":"table_test1-1","size":80936040,"offsetLag":0,"isFuture":false}]}]}]}

唉,神奇了,调用没有报错,也正确返回了结果,为啥CI用例就失败了呢?

于是,进一步分析了下相关的参数:

代码语言:javascript
复制
--bootstrap-server: 指定kafka broker的地址(必需的参数)
--describe: 描述指定brokers的指定(topic分区)目录信息(必需的参数)
--broker-list:用于指定请求的kafka broker的ID列表(非必需的参数)
--topic-list: 指定的topic列表(必需的参数)

其中,需要注意的是"--broker-list"这个参数,如果不带该参数,则以元数据请求中的kafka集群信息为准,否则以指定的"--broker-list"为准。

那么,测试CI的那个问题难道是参数指定了不存在(或者已停止)的kafka节点?

带着疑问,再次敲了命令,这次在"--broker-list"中指定了一个实际不存在的ID。

代码语言:javascript
复制
[root@kafka-0 bin]# sh kafka-run-class.sh kafka.admin.LogDirsCommand --bootstrap-server 10.38.221.239:9092 --broker-list 0,9 --describe --topic-list table_test1
Querying brokers for log directories information
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
        at kafka.admin.LogDirsCommand$.describe(LogDirsCommand.scala:51)
        at kafka.admin.LogDirsCommand$.main(LogDirsCommand.scala:37)
        at kafka.admin.LogDirsCommand.main(LogDirsCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

噢,这次的情况变成了超时,和CI失败的用例的现象是一致的。

至于为什么会超时,分析了下"KafkaAdminClient"的源码,主要逻辑为:对于请求中的每个BrokerID,都需要从元数据请求中找到对应的broker信息,然后分别向这些broker建立连接,并真正发送请求。否则一直在pending队列中,直到元数据请求信息能匹配到对应的信息或请求超时。

感觉问题基本清楚的同时,心里也有了一定的底气,再次询问了下测试兄弟,请求参数的值是什么?是不是填错了?经过测试兄弟的确认后,发现入参"broker-list"的值与实际部署的kafka节点数不一致,也就是说"broker-list"中有不存在的broker ID,最终导致了请求超时的问题。经过修改参数后,CI用例都成功通过了。

小结一下,本问题其实是一个很简单的问题,关键在于使用时需要清楚地知道对应参数的含义,否则就可能引起问题。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-11-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 陈猿解码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档