首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

正确处理kafka多线程消费的姿势

最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息。通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步、 解耦、 削峰等几大好处,而且开始考虑最大的好处,可以实现架构的水平扩展,下游系统出现性能瓶颈,容器平台伸缩增加一些实例消费能力很快就提上来了,整体系统架构上不用任何变动。理论上,我们项目数据量再大整体架构上高可用都没有问题。在使用kafka过程中也遇到一些问题:

1. 消息逐渐积压,消费能力跟不上;

2.某个消费者实例因为某些异常原因挂掉,造成少量数据丢失的问题。

针对消费积压的问题,通过研究kafka多线程消费的原理,解决了消费积压的问题。所以,理解多线程的Consumer模型是非常有必要,对于我们正确处理kafka多线程消费很重要。

kafka多线程消费模式

说kafka多线程消费模式前,我们先来说下kafka本身设计的线程模型和ConcurrentmodificationException异常的原因。见官方文档:

The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.

ConcurrentmodificationException异常的出处见以下代码:

该方法acquire 会在KafkaConsumer的大部分公有方法调用第一句就判断是否正在同一个KafkaConsumer被多个线程调用。

"正在"怎么理解呢?我们顺便看下KafkaConsumer的commitAsync 这个方法就知道了。

我们看KafkaConsumer的release方法就是释放正在操作KafkaConsumer实例的引用。

通过以上的代码理解,我们可以总结出来kafka多线程的要点:

kafka的KafkaConsumer必须保证只能被一个线程操作。

下面就来说说,我理解的Kafka能支持的两种多线程模型,首先,我们必须保证操作KafkaConsumer实例的只能是一个线程,那我们要想多线程只能用在消费ConsumerRecord List上动心思了。下面列举我理解的kafka多线程消费模式。

模式一1个Consumer模型对应一个线程消费,最多可以有topic对应的partition个线程同时消费Topic。

模式二1个Consumer和多个线程消费模型,保证只有一个线程操作KafkaConsumer,其它线程消费ConsumerRecord列表。

注意

第二种模式其实也可以支持多个Consumer,用户最多可以启用partition总数个Consumer实例,然后,模式二跟模式一唯一的差别就是模式二在单个Consuemr里面是多线程消费,而模式一单个Consumer里面是单线程消费。

以上两种kafka多线程消费模式优缺点对比:

其实,模式二就是模式一情况在消费单个Consumer内部的特殊情况。

kafka多线程消费模式实现

关于多线程消费模式具体实现都是选择基于spring-kafka实现,毕竟站在巨人肩膀上,站的高望的远少加班,以下就是模式二的具体实现,模式一的话就是对模式二的简化,具体实现如下。

具体业务代码在BaseConsumer:

其中,closeConsumeExecutorService方法就是为了服务实例异常退出或者多机房上线kill的情况下,尽最大可能保证本次拉下来的任务被消费掉。最后,附上closeConsumeExecutorService实现,觉得RocketMQ源码这个实现的不错,就借用过来了,在此表示感谢。

下面回到使用kafka遇到的第二个问题,怎么解决消费者实例因为某些原因挂掉,造成少量数据丢失的问题。其实,通过我们上面的写法,已经不会出现因为某些原因服务实例(docker、物理机)挂掉,丢数据的情况。因为我们是先拉取后消费,消费完才手动提交kafka确认offset。实在还存在万一退出时候调用的closeConsumeExecutorService方法还没有消费完数据,表示这个时候offset肯定没有手动提交,这一部分数据也不会丢失,会在服务实例恢复了重新拉取消费。

以上的代码存在极小的可能瑕疵,比如,我们双机房切换上线,某机房实例有一部分数据没有消费,下次会重复消费的问题。其实,这个问题我们在业务上通过在配置中心配置一个标识符来控制,当改变标识符控制某些机房停止拉取kafka消息,这个时候我们就可以安全操作,不担心kafka没有消费完,下次重复消费的问题了。

以上自己使用kafka过程中一些心得体会,难免有所遗漏,感谢指出,知错能改,每天进步。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190826A0MZ7H00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券