聊聊kafka client chunkQueue与MaxLag值

前面一篇文章讨论了ConsumerFetcherManager的MaxLag与ConsumerOffsetChecker的lag值的区别。但是关于MaxLag的值还没有讲的太透彻,这里再深入一下,如何让ConsumerFetcherManager的MaxLag有值。

AbstractFetcherThread#processFetchRequest

kafka_2.10-0.8.2.2-sources.jar!/kafka/server/AbstractFetcherThread.scala

值得注意,这里构建了fetchRequest

这里的partitionMap,key是TopicAndPartition,value就是本地最大的offset

每次拉取的时候,以本地已经拉取的最大值,还有拉取大小构造fetchRequest

FetchRequest

kafka_2.10-0.8.2.2-sources.jar!/kafka/api/FetchRequest.scala

可以看到这里的offset与fetchSize决定了这个fetcher从broker拉取数据的开始位置和拉取数据的条数。

ConsumerFetcherThread

kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scala

这里使用的fetchSize来自config.fetchMessageMaxBytes

kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerConfig.scala

这个fetchSize默认是1024 * 1024,也就是1048576,即每次fetch的时候拉取1048576这么多条。

AbstractFetcherThread#processFetchRequest

ConsumerFetcherThread#processPartitionData

kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ConsumerFetcherThread.scala

PartitionTopicInfo#enqueue

kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/PartitionTopicInfo.scala

如果数据为空,则不放进队列

chunkQueue大小

kafka_2.10-0.8.2.2-sources.jar!/kafka/consumer/ZookeeperConsumerConnector.scala

queue在这里创建了,大小为config.queuedMaxMessages

默认队列最大只能有2个FetchedDataChunk

而每个FetchedDataChunk里头最大的消息数目就是fetchSize大小也就是10241024

也就是说每个消费线程的chunkQueue里头默认最大的消息数目为21024*1024

当超过这个数目的时候,enquue就会阻塞,这样就形成了对整个fetch的拉取速度的控制。

ConsumerFetcherManager的MaxLag

那么每次只拉10条消息,假设目前的lag如下

拉取一次之后

这里的nextOffset = offset + 1,也就是拉取回来的最大offset+1 = 259,hw的话是8702,那么lag值就是8702-259=8443

这里为了复现,让消费线程拉取一条之后抛异常退出

小结

生产环境注意根据消息大小以及环境内存等对如下参数进行配置,否则很容易引发OOM

另外关于ConsumerFetcherManager的MaxLag,只有在上面两个参数合理设置的情况下,才能对监控有点点帮助()。从实际场景来看,还是一般比较少改动参数的话,那么还是得以ConsumerOffsetChecker的lag值做消费者消费滞后的监控才准确。

doc

ConsumerFetcherManager MaxLag

apache kafka系列之jmx监控指标参数

Kafka源码分析 Consumer(2) Fetcher

  • 发表于:
  • 原文链接:http://kuaibao.qq.com/s/20171230G0B5JG00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券