首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

深入Kafka系列(一)Producer开发

Kafka是目前业界最经典的消息引擎,之前在学习工作中只是对基本的原理和使用方法有一点了解。寒假稍微有一点空余时间,想对Kafka有一个更深入的了解。

本系列是对胡夕《Apache Kafka实战》一书的学习笔记,会跟随作者的安排,逐章对Kafka的设计细节做介绍。《Apache Kafka实战》这本书写的很棒,一看就是行家用心写出来的。如果想深入学习Kafka的同学可以考虑入手一本。

本系列直接从Producer开发开始。Kafka自0.9.0.0版本后,就启用了新设计的Java版的Producer取代旧的Scala版本的Producer,本文介绍的也是新版本。

1. Producer工作流程

Producer的工作流程可以简单概述为三步:

封装消息:使用用户主线程将待发送的消息封装成ProducerRecord类实例,在完成序列化后发送给partitioner。

完成分区:partitioner根据分区策略(后文介绍)对消息进行分区,将同一分区的消息发送给某一块内存缓存区。

完成发送:使用另外一个I/O线程实时从缓存区域中提取消息封装成一个batch,统一发送给对应的broker。

Producer需要完成的任务就是对消息进行分区,以及确定分区的leader。

2. 构造Producer

2.1 代码实例

图1

基本的Producer构造代码如图1所示,基本步骤有:

构造Properties对象,指定必要的参数包括bootstrap.servers,key.serializer,value.serializer

构造KafkaProducer对象

构造ProducerRecord对象,必须指定的参数有topic,value

调用KafkaProducer的send,有两个方种发送:同步发送和异步发送+回调

关闭KafkaProducer

2.2 详细解释

构造Properties对象:使用Properties指定参数,有

bootstrap.servers:一组broker列表,包含対

key/value serializer:发送给broker的消息必须是字节数组,所以需要指定key和value的序列化格式,一般都是StringSerializer

acks:指定在producer发送响应前,leader broker必须确保已经成功写入该消息的副本数,分别为0,1,all。

0表示producer发送消息后不等broker的返回结果,直接进入下一条的发送

1表示只需要消息写进broker leader的日志就可以

-1/all 表示必须所有副本都完成消息持久化后,才发送结果给producer,开始下一条的发送

buffer.memory:用于缓存消息的缓冲区大小,默认32MB。刚才在工作流程中也讲过,producer会把发给同一分区的消息缓存在缓冲区内,等待I/O进程适时发送。这部分缓冲区的大小由此参数设置。

compression.type:消息是否压缩,默认是None。目前支持的压缩方式有三种,其中效果最好的是LZ4。压缩需要格外消耗CPU资源,仅在带宽资源不足,prodocer的CPU资源充足时考虑压缩。

retries:当消息发送失败时,producer自动重新发送消息的次数。

batch.size:最重要的调优参数之一。还是刚才所说的,producer将同一分区的消息缓存在缓冲区内,并封装成一个一个batch,当满了以后会由I/O进程发送。batch大小就很重要,太小的话一次发送请求发送的消息数太少;太大的话对内存压力又很大。默认的大小是16kb,适当的提高此参数可以提高吞吐量。

linger.ms:batch没满时,也可能被提前发送。linger.ms参数可以控制消息发送延迟行为,默认是0,即消息需要立即被发送,无需关心batch是否填满。这样设计大多数情况下是合理的,但是会拉低吞吐量。

还有其他的参数,自己看官网吧。

构造KafkaProducer对象:在Properties里完成参数设置后,就可以构造KafkaProducer对象了。

构造ProducerRecord对象:需要将topic和value信息包装在ProducerRecord对象中,key可选。

发送消息:Kafka发送消息有两种方式:

异步发送:send方法会返还一个Java Future对象供用户获取发送结果。根据回调的参数实现异步发送以及对发送结果的响应。

同步发送:调用send().get()方法可以实现同步发送的效果,即无限等待broker返还给producer的结果。

关闭producer:producer占用了大量系统资源,使用完后必须关闭。

3. 消息分区策略

3.1 默认的分区策略

当消息指定key时,使用murmur2算法计算哈希值,然后由哈希值对总分区数求模后找到目标分区号,此时完成分区操作,相同的key的所有消息分配到相同的分区。

当没有指定key是,partitioner根据轮询的方式确保所有分区均匀。

3.2 自定义分区

如果默认的分区策略无法满足要求时,可以实现自定义分区。需要完成两个步骤:

创建自定义分区类,实现Partitioner接口

Properties设置partitioner.class参数

3.3 自定义分区实例

图2

这个自定义分区的实例实现了,当key包含"audit"字符串时,该消息发送到最后一个分区,其他消息按照随机的策略发送到其他分区。

4. producer拦截器

4.1 拦截器

拦截器interceptor可以实现消息发送前、producer回调逻辑前对消息做一些定制化需求,比如修改消息等。interceptor通过接口ProducerInterceptor实现,主要有两个方法:

onSend:运行在用户主线程中,在消息被序列化以计算分区前调用,用户可以对消息做任何修改。

onAcknowledgement:运行在I/O线程中,在消息被应答前或消息发送失败时调用,通常发生在producer回调逻辑触发之前。

4.2 实例

4.2.1 onSend实例

图3

图3自定义了一个拦截器,在消息发送之前对消息做了修改,在value值添加了时间戳。然后需要在Properties参数配置中按图4的方式添加,结果如图5所示。

图4

图5

4.2.2onAcknowledgement实例

图6

图6的拦截器实现了消息发送后更新”发送成功消息数“和“发送失败消息数”。添加方式一样,如图7,此时可以构成双interceptor的拦截链。结果如图8。

图7

图8

在《Apache Kafka实战》中,还介绍了消息序列化、无消息丢失配置、消息压缩等关于Kafka Producer的介绍,需要的同学可以参阅。

欢迎阅读、订阅、转载、收藏

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20190114G065C900?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券