首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

4.kafka生产者&消费者

原生方式

无论是生产者还是消费者,引入的依赖都是kafka-clients,maven坐标如下:

生产者

kafka生产者对象就是KafkaProducer,构造方式如下:

消息

KafkaProducer构造好后,需要构造待发送的消息。kafka消息对象是ProducerRecord,根据源码可知,构造方式有多种:

创建消息:

下面构造一个最常用的ProducerRecord,只指定topic和value,由kafka去决定分区:

消费者

kafka消费者者对象就是KafkaConsumer,构造方式如下:

订阅并消费

集成spring方式

现在的项目一般标配了spring,通过spring集成kafka能够大大的方便业务开发。集成方式也比较简单,只需增加如下maven坐标:

生产者

spring集成kafka的生产者配置方式如下(部分属性配置通过properties解耦,用户使用时可以自定义):

发送消息

发送消息进行如下封装,封装后如果要发送kafka消息,只需一行代码即可,例如(obj就是要发送的消息对象):

消费者

spring集成kafka的消费者配置方式如下(部分属性配置通过properties解耦,用户使用时可以自定义):

消息处理

由上面的配置可以,指定的topic,其消息由OpenAccountKafkaListener处理,OpenAccountKafkaListener的核心源码如下:

显而易见,spring集成kafka后,消费端的简单的很多。另外,我们在没用使用spring集成kafka时可以拿到kafak消费者异步提交,也可以同步提交,但是集成spring后,如何实现呢?客官老爷们稍安勿躁,继续往下看。

消息处理(第二版)

深入发送消息

前面已经介绍了如何使用kafka生产者发送消息,以及如何用消费者接收消息,包括原生方式和spring集成方式,接下来我们跟踪源码看看消息在调用KafkaProducer中的send()后发送到kafka broker之前需要经过哪些处理。

拦截器

无论是同步调用send(),还是异步调用send()发送消息,最终都是调用下面的方法:

由这段代码可知,消息发送前第一步就是调用拦截器(如果有的话),拦截器可以对消息进行加工。后面会单独有一篇文章详细的分析拦截器。

接下来调用doSend()方法,源码如下:

获取集群信息

由doSend()方法源码可知,获取集群信息的源码就在waitOnMetadata()中,其源码如下:

通过这段源码分析可知,当我们构造KafkaProducer时指定的bootstrap.servers的值,不一定要和kafka集群信息完全一致,kafka-client可以通过参数bootstrap.servers指定的broker,然后从broker上获取到整个kafka集群元数据信息。但是即使是这样,参数bootstrap.servers也建议尽量完整。例如整个集群有3个broker,如果bootstrap.servers只指定了1个broker,那么当这个broker宕机后,虽然集群状态可用。但是

序列化

即经过拦截器链后另一个非常重要的操作:对key&value的序列化。核心代码是如下两行,对key的序列化,调用的方法是构造KafkaProducer时参数key.serializer指定的serializer,对value的序列化,调用的方法是构造KafkaProducer时参数value.serializer指定的serializer:

分区

接下来就是选择分区,核心代码如下:

总结

根据上面的分析可知,消息发送经过的几个重要过程按照先后顺序依次是:拦截器,获取元数据,序列化,选择分区。接下来的文章会一一详细分析这些必要重要的过程。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180802G00HAR00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券