序
本文主要解析一下遇到的一个kafka consumer offset lag不断增大的异常。
查看consumer消费情况
发现消费者的offset与logSize差距太大,lag值都过10w了。
正常的情况
像这种lag差距比较少的,是正常的。
查看topic的partition
topic是4个分区,因此4个consumer来消费是正常的。
问题可能是消费者消费速度太慢,或者消费者消费异常。
排查
jstack -l pid
上面consume-1以及consume-2是具体的消费kafka的业务线程
error log
日志几乎没有消费到消息的痕迹,但是lag确又有那么多。
一开始看异常日志,找到这个,加上上面的jstack,看到ConsumerFetcherThread一直blocking在PartitionTopicInfo.enqueue,有点怀疑是rebalance引起的死锁或阻塞。之前jstack忘记加-l,无法看到死锁信息。网上查了一下,看到ConsumerFetcherThread deadlock?有提到类似的问题,不过看是14年的帖子的,kafka0.8.2.2版本应该是有修复了才对。紧接着看到
The fetchers are blocked on the queue since it is full, is your consumer
iterator stopped and hence not getting more data from it?
有点开始怀疑是否是自己的业务线程没有捕获异常挂了,因而就没有消费了。重启了下程序,看log,刷刷的消费消息。再jstack对比一下
对比一下,发现原来怀疑的ConsumerFetcherThread一直blocking在PartitionTopicInfo.enqueue在重启之后还是存在,因此可能是正常的。
在对比下consume-1与consume-2,发现了问题,有问题的线程堆栈没有看到自己的业务方法,而重启之后发现了业务方法。因此问题的原因渐渐明朗,就是因为没有catch异常导致。
业务方法
原来的业务方法大致如下
这里有个疑问就是线程异常没有catch的话,理论上再次new的线程,id应该递增才对,但是通过实验发现,走async的,抛异常之后,线程id都不变。
spring-core-4.3.13.RELEASE-sources.jar!/org/springframework/util/CustomizableThreadCreator.java
这里的threadCount没有看到调用decrement方法,因此如果线程异常挂掉,则理论上新补充的线程id应该是递增的。
/Library/Java/JavaVirtualMachines/jdk1.8.0_71.jdk/Contents/Home/src.zip!/java/util/concurrent/ThreadPoolExecutor.java
调试发现completedAbruptly都是false,因而业务线程没有抛异常,这岂不是矛盾了。突然想起async注解的拦截,渐渐豁然开朗。
AsyncExecutionInterceptor
spring-aop-4.3.13.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java
async注解通过AsyncExecutionInterceptor拦截,然后包了一层,处理了异常,因此线程池里头是没有异常的。
小结
使用kafka消费数据的时候,需要对offset的lag值进行实时监控,以确认消费速度是否ok
调用KafkaStream的iterator消费线程必须catch住异常,否则抛出了异常,就停止消费了。
doc
ConsumerFetcherThread deadlock?
Java Highlevel Consumer is stuck and the lag is increasing
领取专属 10元无门槛券
私享最新 技术干货