欢迎您关注《大数据成神之路》
这是一篇关于 Kafka 实践的文章,内容来自 DataWorks Summit/Hadoop Summit(Hadoop Summit)上的一篇分享,里面讲述了很多关于 Kafka 配置、监控、优化的内容,绝对是在实践中总结出的精华,有很大的借鉴参考意义,本文主要是根据 PPT 的内容进行翻译及适当补充。
Kafka 的架构这里就不多做介绍了,直接步入正题。
这里主要是 Kafka 集群基本配置的相关内容。
Kafka 集群基本硬件的保证
Kafka 集群需要监控的一些指标,这些指标反应了集群的健康度。
replica.lag.time.max.ms
:默认为10000,如果 follower 落后于 leader 的消息数超过这个数值时,leader 就将 follower 从 isr 列表中移除;num.replica.fetchers
,默认为1,用于从 leader 同步数据的 fetcher 线程数;min.insync.replica
:Producer 端使用来用于保证 Durability(持久性);当发现 replica 的配置与集群的不同时,一般情况都是集群上的 replica 少于配置数时,可以从以下几个角度来排查问题:
允许不在 isr 中 replica 被选举为 leader。
unclean.leader.election.enable
:默认为 true,即允许不在 isr 中 replica 选为 leader,这个配置可以全局配置,也可以在 topic 级别配置;Broker 级别有几个比较重要的配置,一般需要根据实际情况进行相应配置的:
log.retention.{ms, minutes, hours}
, log.retention.bytes
:数据保存时间;message.max.bytes
, replica.fetch.max.bytes
;delete.topic.enable
:默认为 false,是否允许通过 admin tool 来删除 topic;unclean.leader.election.enable
= false,参见上面;min.insync.replicas
= 2:当 Producer 的 acks 设置为 all 或 -1 时,min.insync.replicas
代表了必须进行确认的最小 replica 数,如果不够的话 Producer 将会报 NotEnoughReplicas
或 NotEnoughReplicasAfterAppend
异常;replica.lag.time.max.ms
(超过这个时间没有发送请求的话,follower 将从 isr 中移除), num.replica.fetchers;replica.fetch.response.max.bytes
;zookeeper.session.timeout.ms
= 30s;num.io.threads
:默认为8,KafkaRequestHandlerPool 的大小。Partition 的增加将会带来以下几个优点和缺点:
replica.fetch.response.max.bytes
:用于限制 replica 拉取请求的内存使用;kafka-reassign-partitions.sh -- -throttle option
。kafka-producer-perf-test.sh
测试你的环境;max.in.flight.requests.per.connection
:默认为5,表示 client 在 blocking 之前向单个连接(broker)发送的未确认请求的最大数,超过1时,将会影响数据的顺序性;compression.type
:压缩设置,会提高吞吐量;acks
:数据 durability 的设置;num.replica.fetchers
(follower 同步数据的线程数)来调解;kafka-consumer-perf-test.sh
测试环境;__consumer_offsets
)offsets.topic.replication.factor
:默认为3;offsets.retention.minutes
:默认为1440,即 1day;
– MonitorISR,topicsize;fetch.min.bytes
、fetch.max.wait.ms
;max.poll.interval.ms
:调用 poll()
之后延迟的最大时间,超过这个时间没有调用 poll()
的话,就会认为这个 consumer 挂掉了,将会进行 rebalance;max.poll.records
:当调用 poll()
之后返回最大的 record 数,默认为500;session.timeout.ms
;bin/kafka-consumer-groups.sh
查看;这个是常用的配置:
block.on.buffer.full
:默认设置为 false,当达到内存设置时,可能通过 block 停止接受新的 record 或者抛出一些错误,默认情况下,Producer 将不会抛出 BufferExhaustException,而是当达到 max.block.ms
这个时间后直接抛出 TimeoutException。设置为 true 的意义就是将 max.block.ms
设置为 Long.MAX_VALUE,未来版本中这个设置将被遗弃,推荐设置 max.block.ms
。参考: