聊聊kafka 0.8 ConsumerFetcherManager的MaxLag指标

本文主要研究一下kafka0.8.2.2版本中ConsumerFetcherManager的MaxLag指标的统计。

问题

使用jmx查询出来的MaxLag跟使用ConsumerOffsetChecker查出来的总是不一样,几乎是jmx查出来的是0,但是实际是存在lag的。这里探究一下这个MaxLag的计算。

AbstractFetcherManager

kafka_2.10-0.8.2.2-sources.jar!/kafka/server/AbstractFetcherManager.scala

具体newGauge是调用KafkaMetricsGroup的方法

重点看这个计算逻辑,所有的数据都在fetcherThreadMap里头,key是BrokerAndFetcherId,value是AbstractFetcherThread,具体实例的类是ConsumerFetcherThread,它继承了AbstractFetcherThread

AbstractFetcherThread.fetcherLagStats

AbstractFetcherThread里头有个重要的字段,就是fetcherLagStats。

AbstractFetcherThread#FetcherLagMetrics

lag值的更新

lag值的更新在AbstractFetcherThread#processFetchRequest

fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset

这个是在AbstractFetcherThread#doWork方法里头

AbstractFetcherThread#doWork

ShutdownableThread#run

ConsumerOffsetChecker

kafka_2.10-0.8.2.2-sources.jar!/kafka/tools/ConsumerOffsetChecker.scala

主要是这个processPartition进行获取lag的逻辑

里头依赖的offsetMap获取逻辑如下

大体的逻辑就是

构造OffsetFetchRequest,获取consumer在topic的每个partition的消费的offset信息

构造OffsetRequest,获取topic的每个partition的logSize

logSize - consumer的offset = lag

小结

HighWaterMark

问题可能就在这个HighWaterMark:

ConsumerFetcherManager使用HighWaterMark - newOffset

ConsumerOffsetChecker调用SimpleConsumer的getOffsetsBefore,获取的是leaderEndOffset,即leaderEndOffset - newOffset

HighWaterMark取的是partition对应的ISR中最小的LEO,消费者最多只能消费到HW所在的位置

毫无疑问使用leader的offset肯定比使用HighWaterMark的数据要大,这样在replica延迟大的时候,表现更为明显

但是实际情况,即使消费端故意模拟耗时消费处理,也不见得这个数据变大,几乎总是0,因此问题还不是这个HighWaterMark

messages.lastOption

最后调试了一次,进入AbstractFetcherThread里头,看到这段数据的真实值,才恍然大悟

原来这里统计的是fetcher拉取的最新数据的offset与partition的HighWaterMark的差值,而拉取回来是放到一个内存队列里头让业务消费线程去消费的;它衡量的fetcher拉取的速度,而不是消费者消费的速度,要看消费者与生产者的lag值,就得使用ConsumerOffsetChecker去检查。

看来还真的不能望文生义,被坑了一天

doc

Kafka数据可靠性与一致性解析

AbstractFetcherThread

ConsumerFetcherManager MaxLag

  • 发表于:
  • 原文链接:http://kuaibao.qq.com/s/20171229G0OAPO00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券