前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >6.ProducerConfig详解(上)

6.ProducerConfig详解(上)

作者头像
ParkJun
发布2020-07-14 10:25:44
1.6K0
发布2020-07-14 10:25:44
举报
文章被收录于专栏:ParkJun随笔ParkJun随笔

本文章中针对的是kafka-clients 1.1.0版本。 ProducerConfig 类在包org.apache.kafka.clients.producer中。

ProducerConfig各配置项

bootstrap.servers

重要性:高

类型:List

默认值:Collections.emptyList()

引导producer查找Kafka集群所有broker的引导服务地址列表。

顾名思义,该配置项是引导服务列表,即用于查找Kafka集群中所有broker的host:port列表,producer通过这些host:port与kafka集群建立连接。producer用该列表中的地址只是用于发现kafka集群中所有的服务broker,而在kafka集群中,broker可能是动态改变的。另外,Kafka机制中,可以通过某一个broker而查询到所有其他broker,所以在bootstrap.servers中,并不需要配置所有broker的host:port,理想情况下,只需要配置其中的某一个就可以了。但为了提升可用性,避免因该broker挂掉而导致无法查找,那么可以选择配置多个。

配置格式为:

代码语言:javascript
复制
host1:port1,host2:port2,...

metadata.max.age.ms

重要性:高

类型:Long

默认值:300000毫秒,即5分钟

元数据最大生存时间,每隔metadata.max.age.ms时间,producer客户端会强制刷新一遍元数据metadata,即使没有任何partition leadership主动发现新的broker或者新的partition。

元数据

元数据类org.apache.kafka.clients#Metadata中,除了记录一些和自身更新策略有关的信息(metadata的更新策略值得另开一篇文章分析)。还保存了kafka集群的一些信息,参见org.apache.kafka.common#Cluster类:

集群中所有结点broker node列表,Node结点中记录了结点的ip、port以及机架信息(rack)。

代码语言:javascript
复制
机架信息(rack):broker的机架信息,类似于Hadoop那样,可以更好地利用局部性原 

理减少集群中网络开销。如果指定了机架信息(brooker.rack), Kafka在为分区做副 

本分配时就会考虑这部分信息,尽可能地为副本挑选不同机架的broker。

集群中每一个TopicPartition,对应的分区信息PartitionInfo。org.apache.kafka.common#PartitionInfo中主要记录了如下信息:

分区所属的topic。

分区partition编号。

分区的leader所在结点。

分区副本结点列表。

分区副本同步结点队列(ISR)。

离线副本结点队列。

集群中的控制结点信息。

控制结点broker:负责管理整个集群中分区和副本的状态,比如partition的leader 副本故障,由controller 负责为该partition重新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某个topic分区的时候也会由controller管理分区的重新分配工作 集群中每个topic对应的所有分区列表,相当于以topic作为索引。 集群中每个topic对应的可用分区列表。 集群中每个结点broker node对应的所有分区列表,相当于以broker.id作为索引。 集群中每个结点ID(broker.id)对应的结点信息。

batch.size

重要性:高

类型:Long

默认值:16384字节,即16K

消息记录batch(批)大小控制。kafka producer在将消息记录record发送到集群时,会尝试将一批要发送到相同partition的消息记录压缩在一起,称之为batch(批)。每次request,其实不是发送一个record,而是发送若干个batch,而每个batch里面可能包含多个record。这样成批成批的发送,减少了网络请求,有助于提升producer客户端和kafka集群服务的性能。

batch.size就是用来设置一个batch的最大字节数byte。当设置为0时,表示完全禁用batch的功能。如果batch.size设置过大,那么可能造成内存浪费,因为每个发送到不同partition的record都需要预先分配一块batch.size大小的内存。

acks

重要性:高

类型:String

默认值:"1"

应答数设置。producer只有接收到来自server的acks指定数量的应答,才会认为发送给server的消息记录已送达。该配置项用于控制已发送消息记录的持久性,有以下几种设置值:

acks = 0:表示producer无需等待server端的应答消息,producer将record扔到发送缓冲区,就认为该record已经发送,然后转身走人。这种情况无法保证server端真的成功接收到该消息记录,且此时即使retries配置项也无法生效,因为producer无法知道是否失败。另外,每个record返回的offset都被设为-1。

acks = 1:表示接收该消息记录的分区leader将消息记录写入到本地log文件,就返回Acknowledgement,告知producer本次发送已完成,而无需等待其他follower分区的确认。这种情况下,可能出现消息记录没有备份的情况(follower宕机等)。

acks = all:表示消息记录只有得到分区leader以及其他分区副本同步结点队列(ISR)中的分区follower的确认之后,才能回复acknowlegement,告知producer本次发送已完成。这种情况下,只要分区副本同步结点队列(ISR)中某一个follower存活,那么消息记录就不会被丢失。这种方式最安全,但效率也最低。

acks = -1:等同于acks = all。

linger.ms

重要性:中

类型:Long

默认值:0毫秒,表示无延时,立即发送。

延迟发送消息记录的时间,上面及前面文章中也已经提到过,producer在发送消息记录record的时候,会将发送到同一个partition的records压缩在batch中。但通常这只发生在records到达速度快于records发送速度的情况下,很容易理解:如果发送速度大于record到达速度,则每来一个record都会被立即发送出去,根本不存在将多个records压缩为一个的可能。

但很多时候,即便是发送速度大于到达速度,我们也不希望每个record就发送一次,还是希望分批次发送,以减少发送次数,提升producer客户端和服务器端的性能。为此,我们需要人为地加一个发送延迟控制,即每次发送之间,存在一定的时间间隔linger.ms,在这段时间内,可能有多个records到达,此时就可以对他们分组压缩,成批次发送。这类似于TCP的拥塞控制方法。

注意:

linger.ms设置了发送延迟的最高时间上限,另一个配置项batch.size也同时控制着发送的时机。如果为某个partition压缩的batch字节数已经达到了batch.size设置的字节数,那么该batch将被立即发送到指定的partition,即使此时延迟时间还没达到linger.ms的设置。 同样的,如果延迟的时间已经达到了linger.ms的设置,那么即使压缩累积的batch没有达到batch.size设置的字节数,也会被发送到指定的partition。 linger.ms是针对每一个发送到partition的request。即不同partition的request并不是同时发送的。 延迟以为这性能降低,需要在延迟和性能之间进行平衡,找到一个合适的linger.ms值。

client.id

重要程度:中

类型:String

默认值:""

producer 客户端ID,在创建request时,会传送到kafka服务。其目的是为了跟踪记录请求的来源,虽然服务端可以通过ip/port来追踪请求的来源,但ip/port无法表达业务语义,所以,可以通过client.id来设置一个富有业务逻辑语义的名字(如PDK游戏),有助于后续的分析和记录。

send.buffer.bytes

重要程度:中

类型:int

默认值:131072字节,即128K。

TCP发送缓冲区(SO_SNDBUF)的大小,若send.buffer.bytes设为-1,则使用操作系统的默认值。

receive.buffer.bytes

重要程度:中

类型:int

默认值:32768字节,即32K。

TCP接收缓冲区(SO_RCVBUF)大小,当receive.buffer.bytes设置为-1,则使用操作系统默认的大小。

max.request.size

重要程度:中

类型:String

默认值:1048576字节,即1M。

一个请求request中最大字节数,用于控制producer发送的单个请求request中,record batches的最大数量,以避免单个请求数据过于巨大。

max.request.size & batch.size

一个请求request中,可能包含多个record batch。

max.request.size可能影响record batch的大小上限,即当batch.size 大于 max.request.size时,batch的上限就变成了 max.request.size设置的大小。

reconnect.backoff.ms

重要性:低 类型:Long 默认值:50毫秒。

重连间隔时间,避免producer客户端过于紧密循环地重连kafka服务broker。该值针对的是所有client到broker的连接。

reconnect.backoff.max.ms

重要性:低

类型:Long

默认值:1000毫秒

producer客户端连接一个kafka服务(broker)失败重连的总时间,每次连接失败,重连时间都会指数级增加,每次增加的时间会存在20%的随机抖动,以避免连接风暴。

连接风暴 应用启动的时候,经常可能发生各应用服务器的连接数异常飙升的情况。假设连接数的设置为:min值3,max值10,正常的业务使用连接数在5个左右,当重启应用时,各应用连接数可能会飙升到10个,瞬间甚至还有可能部分应用会报取不到连接。启动完成后接下来的时间内,连接开始慢慢返回到业务的正常值。这就是所谓的连接风暴。

max.block.ms

重要性:低 类型:Long 默认值:1000毫秒

该配置值控制着KafkaProducer.send()函数以及KafkaProducer.partitionsFor()函数将阻塞的最大时间。另外当发送缓冲区满或者metadata不可用时,这两个方法也会被阻塞。如果阻塞发生在用户提供的自定义序列化类serializers或者是自定义的分区类partitioner,那么这些阻塞的时间不会被计算在该配置值之类。

ProducerConfig下半部分

本文归作者所有,未经作者允许,不得转载

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • bootstrap.servers
  • metadata.max.age.ms
  • batch.size
  • acks
  • linger.ms
  • client.id
  • send.buffer.bytes
  • receive.buffer.bytes
  • max.request.size
  • max.request.size & batch.size
  • reconnect.backoff.ms
  • reconnect.backoff.max.ms
  • max.block.ms
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档