Kafka解惑之Old Producer(4)——Case Analysis

在前面三篇文章中详细了解了Old Producer的内容,本文主要通过一个实际应用案例来加深各位对Old Producer的理解。

TopicMetadataResponse中包含有broker的id、host、port,还有topic、topic中的partition、partition所对应的leader、AR和ISR等等,有兴趣的同学可以进一步翻阅kafka.api.TopicMetadataResponse这个类,主体代码只有30行左右,只要学一点Scala的构造函数相关的只是就能看懂。

我们回过头来再来进一步分析ClientUtils.fetchTopicMetadata这个方法,详细代码如下:

fetchTopicMetadata方法参数列表中brokers代表metadata.broker.list所配置的地址列表。可以看到方法中首先建立TopicMetadataRequest的请求,然后从brokers中随机挑选(做一个shuffle,然后从列表中的第一个开始取,也就是相当于随机)一个broker建立SyncProducer并发送TopicMetadataRequest请求,问题的关键就在这个随机挑选一个broker之上,如果正好随机到的是那台磁盘损毁而崩溃的机器,那么这个请求必定要等到设定的超时时间之后才能捕获异常:[ERROR] - [fetching topic metadata for topics [Set(hidden-topic)] from broker [ArrayBuffer(id:0,host:xxx.xxx.xxx.xxx,port:9092)] failed] ,进而再找到下一个broker重新发送TopicMetadataRequest请求。

上面提到了超时时间,这个超时时间是通过request.timeout.ms参数设定的,默认值为10000,也就是10s。具体指的是kafka.network.BlockingChannel中的channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)这段代码,参数connectTimeoutMs就是指request.timeout.ms。如果元数据的请求时打到那台崩溃的broker上的话,那么元数据的请求就要耗时10s以上,待元数据刷新后才能发送消息。这个request.timeout.ms参数才是导致文中开头有少量消息发送时延很大的原因。为了进一步验证结论是否正确,笔者将相关的类SyncProducer、BlockingChannel用Java重写了一遍,并测试出请求的耗时,当访问一个不存在的ip地址时,从发送请求到异常报出的耗时在10194ms。不过如果访问一个存在的ip地址时,但是没有kafka服务的话,从发送请求到返回的耗时只有1248ms,基本上减少了一个数量级,如果读者在遇到同样的问题时,不妨上线一台(随意一台能建立TCP连接的机器就好)与崩溃宕机的那台broker一样的ip地址的机器,上面无需运行kafka的服务,就能大大的降低发送消息的时延。这样可以流出更多的时间去定位、修复、重新上线那台崩溃的broker。

Old Producer拉取元数据信息以及发送消息是在同一个线程中的,这必然会引起局部消息的时延增大。不过在新版的KafkaProducer中,这些问题都已经迎刃而解,具体怎么处理,且看后面的文章分析。

  • 发表于:
  • 原文链接:http://kuaibao.qq.com/s/20180131G05M9X00?refer=cp_1026

扫码关注云+社区