聊聊kafka consumer offset lag increase异常

本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常。

查看consumer消费情况

发现消费者的offset与logSize差距太大,lag值都过10w了。

正常的情况

像这种lag差距比较少的,是正常的。

查看topic的partition

topic是4个分区,因此4个consumer来消费是正常的。

问题可能是消费者消费速度太慢,或者消费者消费异常。

排查

jstack -l pid

上面consume-1以及consume-2是具体的消费kafka的业务线程

error log

日志几乎没有消费到消息的痕迹,但是lag确又有那么多。

一开始看异常日志,找到这个,加上上面的jstack,看到ConsumerFetcherThread一直blocking在PartitionTopicInfo.enqueue,有点怀疑是rebalance引起的死锁或阻塞。之前jstack忘记加-l,无法看到死锁信息。网上查了一下,看到ConsumerFetcherThread deadlock?有提到类似的问题,不过看是14年的帖子的,kafka0.8.2.2版本应该是有修复了才对。紧接着看到

The fetchers are blocked on the queue since it is full, is your consumer

iterator stopped and hence not getting more data from it?

有点开始怀疑是否是自己的业务线程没有捕获异常挂了,因而就没有消费了。重启了下程序,看log,刷刷的消费消息。再jstack对比一下

对比一下,发现原来怀疑的ConsumerFetcherThread一直blocking在PartitionTopicInfo.enqueue在重启之后还是存在,因此可能是正常的。

在对比下consume-1与consume-2,发现了问题,有问题的线程堆栈没有看到自己的业务方法,而重启之后发现了业务方法。因此问题的原因渐渐明朗,就是因为没有catch异常导致。

业务方法

原来的业务方法大致如下

这里有个疑问就是线程异常没有catch的话,理论上再次new的线程,id应该递增才对,但是通过实验发现,走async的,抛异常之后,线程id都不变。

spring-core-4.3.13.RELEASE-sources.jar!/org/springframework/util/CustomizableThreadCreator.java

这里的threadCount没有看到调用decrement方法,因此如果线程异常挂掉,则理论上新补充的线程id应该是递增的。

/Library/Java/JavaVirtualMachines/jdk1.8.0_71.jdk/Contents/Home/src.zip!/java/util/concurrent/ThreadPoolExecutor.java

调试发现completedAbruptly都是false,因而业务线程没有抛异常,这岂不是矛盾了。突然想起async注解的拦截,渐渐豁然开朗。

AsyncExecutionInterceptor

spring-aop-4.3.13.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java

async注解通过AsyncExecutionInterceptor拦截,然后包了一层,处理了异常,因此线程池里头是没有异常的。

小结

使用kafka消费数据的时候,需要对offset的lag值进行实时监控,以确认消费速度是否ok

调用KafkaStream的iterator消费线程必须catch住异常,否则抛出了异常,就停止消费了。

doc

ConsumerFetcherThread deadlock?

Java Highlevel Consumer is stuck and the lag is increasing

  • 发表于:
  • 原文链接:http://kuaibao.qq.com/s/20171227G11MPK00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券