前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >producer参数---Kafka从入门到精通(七)

producer参数---Kafka从入门到精通(七)

作者头像
用户9919783
发布2022-12-14 17:47:46
2770
发布2022-12-14 17:47:46
举报

上篇文章说了,kafka需要先构造properties指定server和kafka集群,key 和 value用stringSerialize序列化,通过producer发送send,需要records参数指定topic和value,之后发送消息,有异步和同步,最后关闭。

构造producer---Kafka从入门到精通(六)

一、producer参数

除了前面说的 三个servers,和key.serializer和value.serializer外,java版本还有很多其他重要参数。

Acks:

Acks参数控制producer生产消息的持久化(durability)。对于任何producer而言,kafka在乎的是“已提交”消息持久化,一旦消息被成功提交,那么只有任何一个保存该消息的副本存在数据,则不会丢失。经常碰到用户抱怨kafka的producer消息丢失,其实这里概念混淆,对于这些消失的消息并没有成功写入kafka,换句话说,他们并没有成功提交。当然,producer的api提供了回调机制解决发送失败的请求数据。

Producer发送消息给kafka集群时,这条消息会指定topic分区leader所在的broker,producer等待从该leader broker返回消息写入结果,(并不是无限等待,有超时时间),以确定消息发送成功。当收到消息后,producer才可以继续发送消息,kafka和关系型数据库的事务类型,永远不会消费未提交的数据。

显然,leader broker何时发送结果返回给producer,这个关系到整个kafka的吞吐量,所以这个参数就是为了控制这件事,acks有三个参数,0、1、-1(all)。

Acks =0:设置成0 代表producer完全不理睬leader broker端处理结果,此时producer发送一条之后立马发送下一条消息,不需要等待leader broker返回结果,因此这种情况下的producer回调也会失去作用,所以acks的时候,并不会保证消息发送成功,但这种情况下 吞吐量是最高的。

Acks=-1或者all:表示发送消息后,leader broker不仅会把消息写入本地日志,同时也会把ISR中其他副本都成功写入他们各自的本地日志,才发送消息给producer,显然只要在ISR中有一个副本存处于存货状态,这时候消息就肯定不会丢失,达到最高持久化。

Acks=1:这是一种折中方式,当leader broker把消息写入本地日志后,则会直接返回给producer,无须等待ISR中的其他副本写入该消息,所以只要leader broker没有宕机,数据就不会丢失。这种方法即可以保证消息持久性,同时也可以保证吞吐量。

不光在java配置文件可以设置,在properties也可以设置,值得注意的是,如果在properties里设置,该参数是字符串,否则会报错。

Buffer.memory:

该参数指定是producer端用于缓存消息的缓冲大小,单位是字节,默认是33554432,即是32MB。如上所述,采用异步发送消息的设计架构,java版本的producer会在启动的时候 先创建一块内存缓存区用于保存待发送的消息(mysql也是在服务器启动的时候会创建buffer pool缓存区),然后由另一个专属线程再去负责从缓冲区真正的执行发送,这部分空间则由buffer.memory参数指定,若producer向缓冲区写消息的速度超过了专属I/O线程发送消息的速度,那么必然会内存里数据会不断增大,此时producer会停止手头上的工作等待I/O线程执行,若一段时间还未执行完毕,则producer则会抛出异常期望用户介入。

虽然producer在工作过程中会用到很多部分内存,但我们几乎可用认定该参数指定的内存大小就是producer程序使用的内存大小。若producer程序要发送很多分区,那么就需要仔细设置该参数,防止内存过小降低了整个producer的吞吐量。

Compression.type

这个参数设置producer端是否压缩消息,默认值是none,即不压缩消息,和任何系统相同的是,kafka的producer端引入压缩后可以显著降低I/O网络传输开销,从而提升整个吞吐量,但也会增加producer端机器cpu的开销。另外,如果broker端压缩参数设置的与producer不同时候,broker端写入消息的时候额外占用cpu资源对消息进行解压缩-重压缩操作。

目前kafka支持三种压缩方法,GZIP/snappy和LZ4,根据实际应用场景来看,producer结合LZ4性能最好。对于kafka1.0.0版本而言,参数最好设置为LZ4。Kafka在2016年8月份的时候,开发了Zstander来压缩producer消息,相信这种会在后续版本中效率是最高的。

Retries

Broker在处理写入请求的时候可能因为瞬时故障(比如kafka的leader选举或者网络抖动)导致消息发送失败。这种故障通常可以自行恢复,如果把这种错误封装进入回调函数,producer也是 自己处理重新发送,所以与其这样,还不如kafka内部自己通过这个参数来自身调用,当然前提是要设置reties参数,0以上才会重试。不过设置这个参数时候,有两点需要注意:

1、重试可能造成消息重复发送:比如由于顺时的网络抖动使得broker端已经成功写入消息,但是没有发送给producer,这时候producer会认为发送消息失败,这就需要consumer方式重复消费。但kafka0.11.0.0版本开始支持“精准一次”处理语义,从而在设计上避免了此类问题。

2、重试可能造成乱序:当producer将多个消息发送,在缓存中时候,如果发生了消息重试,可能造成消息流乱序。为了避免乱序,java版本producer提供了max.in.flight.request.per.connection参数,一旦吧该参数设置成1,表示producer在某一时刻只能发送一次。

另外两次重试时间会有间隔,防止频繁调用对系统带来的冲击,这段时间也可以通过retry.backoff.ms来指定,默认是100ms。由于leader选举换届是最常见的瞬时错误,建议通过测试来计算平均leader选举时间,根据改时间来设定reties和retry.backff.ms。

Batch.size

Batch.size是producer重要参数之一,他对于调优producer吞吐量和延迟性有着重要意义,前面说过,producer会将发往同一个分区多条消息封装进一个batch中,当batch满了的时候,当batch满了,会发送所有消息,不过不是总等待batch满了才发送,只要batch在空闲时间就会发送。

所以当batch设置过小的时候,一次请求发送的数据也少,吞吐量则会比较低。单若一个batch非常巨大的时候,那么内存也会带来更大的压力,因为 不管是否能够填充满,producer都会为该batch分配固定大小的内存,因此batch.size参数设置其实是一种时间与空间权衡的体现。默认值是16384,即16kb。这是一个很保守的数字,在实际生活中设置,通常producer吞吐量会有相应提升。

Linger.ms

上面说到batch.size时,我们提到了消息没有被填满也可以batch发送的情况,这是为什么呢,难道不是满了发送才好嘛,实际这是一种权衡,即吞吐量 和 延时之间的权衡。当前参数就是控制消息发送延迟的权衡,默认是0。大多数情况下这是合理的,不必等待是否填满,直接发送消息就好,不过这样会拉低吞吐量,可以设置成100ms。

Max.request.size

改参数在官方文档说的是,控制producer参数发送请求的大小,实际上是控制producer端发送参数最大消息。由于请求有一些头部信息,所以包含一条消息大小要比消息本体大,可以理解他当做消息最大尺寸安全。如果producer要发送消息的尺寸很大,那么这个参数就要被设置的,默认是1048576字节大小,通常无法满足企业级消息大小要求。可以后面加个0设置。

Request.timeout.ms

当producer发送请求消息给broker后,broker需要在规定时间范围内将结果返回给producer,这段时间则由改参数控制,默认是30秒。也就是说30秒内如果broker没有返回消息给producer,则会在回调函数抛出超时异常。

默认设置通常情况足够的,但是遇到发送负载很大的数据,这时候就需要考虑调整改参数,调高。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-07-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端从入门到精通 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、producer参数
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档