前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Kafka服务端脚本详解和优化

Apache Kafka服务端脚本详解和优化

作者头像
阿提说说
发布2022-11-18 16:44:39
5290
发布2022-11-18 16:44:39
举报
文章被收录于专栏:Java技术进阶Java技术进阶

目录

Kafka服务端脚本详解(1)-topics

kafka-topics.sh

connect-distributed.sh & connect-standalone.sh

 Kafka服务端脚本详解(2)一log,verifiable

kafka-log-dirs.sh

kafka-verifiable-consumer.sh

kafka-verifiable-producer.sh

 Kafka服务端脚本详解(3)-性能测试脚本

 kafka-producer-perf-test.sh

kafka-consumer-perf-test.sh

Kafka生产者端优化

Kafka 生产者端发送延迟优化


Kafka 已经给我们提供了非常丰富的脚本,用来对Kafka进行管理和优化,该文是对Kafka服务端脚本的详解和测试,并尝试通过参数调整来调优Kafka性能

Kafka服务端脚本详解(1)-topics

kafka-topics.sh

--partitions

创建或修改主题的分区数

--replication-factor

副本因子,副本数量

--replica-assignment

手动指定分区副本分配方案,使用该参数,不用指定--partitions 和 --replication-factor

--topic

主题名称

--zookeeper

连接kafka zk地址

--alter

修改分区,副本,配置

--bootstrap-server

kafka服务器地址

--create

创建主题

--delete

删除主题

--list

列出所有的可用主题

代码语言:javascript
复制
 [root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --list
 __consumer_offsets
 first
 test
 topic-3
 topic-4
 topic-5
 topic-6
 topic-admin
 topic-create-diff
 topic-two

--describe

列出主题的详细信息

--exclude-internal

使用--list --describe 命令时是否列出内部主题,默认列出内部主题

--command-config

以配置文件的形式修改Admin Client的配置,支持的配置见org.apache.kafka.clients.admin.AdminClientConfig

代码语言:javascript
复制
//me.properties
request.timeout.ms=200000

//
bin/kafka-topics.sh --bootstrap-server  10.211.55.3:9092 --topic topic-two --list  --command-config config/me.properties 

--config

在创建/修改主题的时候可以对主题默认参数进行覆盖,具体支持的参数见http://kafka.apachecn.org/documentation.html#topicconfigs 该参数将在以后废弃,请使用kafka-configs.sh

代码语言:javascript
复制
 [root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --bootstrap-server  10.211.55.3:9092 --topic topic-two --describe
Topic:topic-two PartitionCount:1        ReplicationFactor:1     Configs:segment.bytes=1073741824,retention.bytes=1073741824
Topic: topic-two        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

 [root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper  10.211.55.3:2181 --alter --topic topic-two --config segment.bytes=1048577
 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
 Updated config for topic topic-two.
 
[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper  10.211.55.3:2181 --describe --topic topic-two
Topic:topic-two PartitionCount:1        ReplicationFactor:1     Configs:segment.bytes=1048577
Topic: topic-two        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

----delete-config

删除一个配置项

代码语言:javascript
复制
1[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-two --alter --delete-config segment.bytes 
2WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
3         Going forward, please use kafka-configs.sh for this functionality
4Updated config for topic topic-two.
5
6[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-two --describe
7Topic:topic-two PartitionCount:1        ReplicationFactor:1     Configs:
8        Topic: topic-two        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

--disable-rack-aware

忽略机架信息 有两个broker,一个配了机架信息,另一个没配,在创建topic的时候就会报错

代码语言:javascript
复制
 1[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --create --topic topic-6 --replication-factor 1  --partitions 2
 2Error while executing topic command : Not all brokers have rack information. Add --disable-rack-aware in command line to make replica assignment without rack information.
 3[2018-12-27 05:22:40,834] ERROR kafka.admin.AdminOperationException: Not all brokers have rack information. Add --disable-rack-aware in command line to make replica assignment without rack information.
 4        at kafka.zk.AdminZkClient.getBrokerMetadatas(AdminZkClient.scala:71)
 5        at kafka.zk.AdminZkClient.createTopic(AdminZkClient.scala:54)
 6        at kafka.admin.TopicCommand$ZookeeperTopicService.createTopic(TopicCommand.scala:274)
 7        at kafka.admin.TopicCommand$TopicService$class.createTopic(TopicCommand.scala:134)
 8        at kafka.admin.TopicCommand$ZookeeperTopicService.createTopic(TopicCommand.scala:266)
 9        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
10        at kafka.admin.TopicCommand.main(TopicCommand.scala)
11 (kafka.admin.TopicCommand$)
12
13[root@10 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --create --topic topic-6 --replication-factor 1  --partitions 2 --disable-rack-aware
14Created topic topic-6.

--if-exists

只有当主题存在时,相关命令才会执行,不会显示错误

代码语言:javascript
复制
 1[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-7  --alter --config segment.bytes=104857 --if-exists
 2
 3[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-7  --alter --config segment.bytes=104857
 4Error while executing topic command : Topics in [] does not exist
 5[2018-12-27 06:01:25,638] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist
 6        at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
 7        at kafka.admin.TopicCommand$ZookeeperTopicService.alterTopic(TopicCommand.scala:294)
 8        at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
 9        at kafka.admin.TopicCommand.main(TopicCommand.scala)
10 (kafka.admin.TopicCommand$)

--if-not-exists

创建主题的时候,只有当主题不存在时,命令才执行,存在时不会报错

代码语言:javascript
复制
1[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6  --create --partitions 1 --replication-factor 1 --if-not-exists
2
3[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6  --create --partitions 1 --replication-factor 1 
4Error while executing topic command : Topic 'topic-6' already exists.
5[2018-12-27 06:07:54,185] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-6' already exists.
6 (kafka.admin.TopicCommand$)

--topics-with-overrides

显示覆盖过配置的主题

--unavailable-partitions

查看没有leader副本的分区

代码语言:javascript
复制
1[root@10 kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6  --describe --unavailable-partitions
2        Topic: topic-6  Partition: 0    Leader: -1      Replicas: 1     Isr: 1

--under-replicated-partitions

查看所有包含失效副本的分区


connect-distributed.sh & connect-standalone.sh

Kafka Connect 是一款可扩展并且可靠的在 Apache Kafka 和其他系统之间进行数据传输的工具。

代码语言:javascript
复制
1bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-source.properties
2
3bin/connect-distributed.sh config/connect-distributed.properties

 Kafka服务端脚本详解(2)一log,verifiable

脚本名称

脚本用途

kafka-log-dirs.sh

查看指定broker上日志目录使用情况

kafka-verifiable-consumer.sh

检验kafka消费者

kafka-verifiable-producer.sh

检验kafka生产者


kafka-log-dirs.sh

--bootstrap-server

kafka地址

--broker-list

要查询的broker地址列表,broker之间逗号隔开,不配置该命令则查询所有broker

--topic-list

指定查询的topic列表,逗号隔开

--command-config

配置Admin Client

--describe

显示详情

代码语言:javascript
复制
1[root@10 kafka_2.11-2.2.0]# bin/kafka-log-dirs.sh --bootstrap-server 10.211.55.3:9092 --describe --broker-list 0 --topic-list first,topic-3
2Querying brokers for log directories information
3Received log directory information from brokers 0
4{"version":1,"brokers":[{"broker":0,"logDirs":[{"logDir":"/tmp/kafka-logs","error":null,"partitions":[{"partition":"topic-3-0","size":474,"offsetLag":0,"isFuture":false},{"partition":"first-0","size":310,"offsetLag":0,"isFuture":false}]}]}]}

kafka-verifiable-consumer.sh

--broker-list

broker列表, HOST1:PORT1,HOST2:PORT2,…

--topic

要消费的topic

--group-id

消费组id

--max-messages

最大消费消息数量,默认-1,一直消费

代码语言:javascript
复制
 1#设置消费两次后,自动停止
 2[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo --max-messages 2
 3{"timestamp":1558869583036,"name":"startup_complete"}
 4{"timestamp":1558869583329,"name":"partitions_revoked","partitions":[]}
 5{"timestamp":1558869583366,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
 6{"timestamp":1558869590352,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":37,"maxOffset":37}]}
 7{"timestamp":1558869590366,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":38}],"success":true}
 8{"timestamp":1558869595328,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":38,"maxOffset":38}]}
 9{"timestamp":1558869595335,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":39}],"success":true}
10{"timestamp":1558869595355,"name":"shutdown_complete"}

--session-timeout

消费者会话超时时间,默认30000ms,服务端如果在该时间内没有接收到消费者的心跳,就会将该消费者从消费组中删除

--enable-autocommit

自动提交,默认false

代码语言:javascript
复制
 1#比较一下两者的差别
 2#没有--enable-autocommit
 3[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo
 4{"timestamp":1558875063613,"name":"startup_complete"}
 5{"timestamp":1558875063922,"name":"partitions_revoked","partitions":[]}
 6{"timestamp":1558875063952,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
 7{"timestamp":1558875069603,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":47,"maxOffset":47}]}
 8{"timestamp":1558875069614,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":48}],"success":true}
 9
10#有--enable-autocommit
11[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo --enable-autocommit
12{"timestamp":1558874772119,"name":"startup_complete"}
13{"timestamp":1558874772408,"name":"partitions_revoked","partitions":[]}
14{"timestamp":1558874772449,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
15{"timestamp":1558874820898,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":46,"maxOffset":46}]}

--reset-policy

设置消费偏移量,earliest从头开始消费,latest从最近的开始消费,none抛出异常,默认earliest

--assignment-strategy

消费者的分区配置策略, 默认 RangeAssignor

--consumer.config

配置文件


kafka-verifiable-producer.sh

该脚本可以生产测试数据发送到指定topic,并将数据已json格式打印到控制台

--topic

主题名称

--broker-list

broker列表, HOST1:PORT1,HOST2:PORT2,…

--max-messages

最大消息数量,默认-1,一直生产消息

--throughput

设置吞吐量,默认-1

--acks

指定分区中必须有多少个副本收到这条消息,才算消息发送成功,默认-1

--producer.config

配置文件

--message-create-time

设置消息创建的时间,时间戳

--value-prefix

设置消息前缀

--repeating-keys

key从0开始,每次递增1,直到指定的值,然后再从0开始

代码语言:javascript
复制
 1[root@10 kafka_2.11-2.2.0]# bin/kafka-verifiable-producer.sh --broker-list 10.211.55.3:9092 --topic first --message-create-time 1527351382000 --value-prefix 1 --repeating-keys 10 --max-messages 20
 2{"timestamp":1558877565069,"name":"startup_complete"}
 3{"timestamp":1558877565231,"name":"producer_send_success","key":"0","value":"1.0","topic":"first","partition":0,"offset":1541118}
 4{"timestamp":1558877565238,"name":"producer_send_success","key":"1","value":"1.1","topic":"first","partition":0,"offset":1541119}
 5{"timestamp":1558877565238,"name":"producer_send_success","key":"2","value":"1.2","topic":"first","partition":0,"offset":1541120}
 6{"timestamp":1558877565238,"name":"producer_send_success","key":"3","value":"1.3","topic":"first","partition":0,"offset":1541121}
 7{"timestamp":1558877565238,"name":"producer_send_success","key":"4","value":"1.4","topic":"first","partition":0,"offset":1541122}
 8{"timestamp":1558877565239,"name":"producer_send_success","key":"5","value":"1.5","topic":"first","partition":0,"offset":1541123}
 9{"timestamp":1558877565239,"name":"producer_send_success","key":"6","value":"1.6","topic":"first","partition":0,"offset":1541124}
10{"timestamp":1558877565239,"name":"producer_send_success","key":"7","value":"1.7","topic":"first","partition":0,"offset":1541125}
11{"timestamp":1558877565239,"name":"producer_send_success","key":"8","value":"1.8","topic":"first","partition":0,"offset":1541126}
12{"timestamp":1558877565239,"name":"producer_send_success","key":"9","value":"1.9","topic":"first","partition":0,"offset":1541127}
13{"timestamp":1558877565239,"name":"producer_send_success","key":"0","value":"1.10","topic":"first","partition":0,"offset":1541128}
14{"timestamp":1558877565239,"name":"producer_send_success","key":"1","value":"1.11","topic":"first","partition":0,"offset":1541129}
15{"timestamp":1558877565239,"name":"producer_send_success","key":"2","value":"1.12","topic":"first","partition":0,"offset":1541130}
16{"timestamp":1558877565240,"name":"producer_send_success","key":"3","value":"1.13","topic":"first","partition":0,"offset":1541131}
17{"timestamp":1558877565240,"name":"producer_send_success","key":"4","value":"1.14","topic":"first","partition":0,"offset":1541132}
18{"timestamp":1558877565241,"name":"producer_send_success","key":"5","value":"1.15","topic":"first","partition":0,"offset":1541133}
19{"timestamp":1558877565244,"name":"producer_send_success","key":"6","value":"1.16","topic":"first","partition":0,"offset":1541134}
20{"timestamp":1558877565244,"name":"producer_send_success","key":"7","value":"1.17","topic":"first","partition":0,"offset":1541135}
21{"timestamp":1558877565244,"name":"producer_send_success","key":"8","value":"1.18","topic":"first","partition":0,"offset":1541136}
22{"timestamp":1558877565244,"name":"producer_send_success","key":"9","value":"1.19","topic":"first","partition":0,"offset":1541137}
23{"timestamp":1558877565262,"name":"shutdown_complete"}
24{"timestamp":1558877565263,"name":"tool_data","sent":20,"acked":20,"target_throughput":-1,"avg_throughput":100.50251256281408}

 Kafka服务端脚本详解(3)-性能测试脚本

脚本名称

脚本用途

kafka-producer-perf-test.sh

kafka 生产者性能测试脚本

kafka-consumer-perf-test.sh

kafka 消费者性能测试脚本

kafka-console-producer.sh

kafka 生产者控制台

kafka-console-consumer.sh

kafka 消费者控制台

 kafka-producer-perf-test.sh

kafka 生产者性能测试脚本

--topic 消息主题名称 ----num-records 需要生产的消息数量 --payload-delimiter 指定 --payload-file 文件的分隔符,默认为换行符 \n --throughput 设置消息吞吐量,messages/sec --producer-props 发送端配置信息,配置信息优先于 --producer.config --producer.config 发送端配置文件 --print-metrics 是否打印测试指标,默认 false --transactional-id 用于测试并发事务的性能 (默认值:performance-producer-default-transactional-id) --transaction-duration-ms 事务时间最大值,超过这个值就提交事务,只有 > 0 时才生效 --record-size 每条消息字节数 --payload-file 测试数据文件

测试 10w 条数据,每条数据 1000 字节,每秒发送 2000 条数据

代码语言:javascript
复制
[root@10 kafka_2.11-2.2.0]# bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=10.211.55.3:9092 --topic first --record-size 1000 --num-records 100000  --throughput 2000
9999 records sent, 1999.8 records/sec (1.91 MB/sec), 8.6 ms avg latency, 406.0 ms max latency.
10007 records sent, 2001.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 8.0 ms max latency.
10002 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 10.0 ms max latency.
10000 records sent, 2000.0 records/sec (1.91 MB/sec), 0.8 ms avg latency, 37.0 ms max latency.
10008 records sent, 2001.2 records/sec (1.91 MB/sec), 0.6 ms avg latency, 7.0 ms max latency.
10004 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 5.0 ms max latency.
10000 records sent, 2000.0 records/sec (1.91 MB/sec), 0.8 ms avg latency, 35.0 ms max latency.
10004 records sent, 2000.8 records/sec (1.91 MB/sec), 0.8 ms avg latency, 33.0 ms max latency.
10004 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 5.0 ms max latency.
100000 records sent, 1999.280259 records/sec (1.91 MB/sec), 1.50 ms avg latency, 406.00 ms max latency, 1 ms 50th, 2 ms 95th, 43 ms 99th, 91 ms 99.9th.

测试结果为:每秒发送 1.91MB 数据,平均延迟 1.5ms,最大延迟 406ms, 延迟小于 1ms 占 50%,小于 2ms 占 95%...


kafka-consumer-perf-test.sh

kafka 消费者性能测试脚本 --topic 消费的主题名称 --broker-list kafka 地址 --consumer.config 消费端配置文件 --date-format 格式化时间 --fetch-size 一次请求拉取的消息大小,默认 1048576 字节 --from-latest 如果消费者还没有已建立的偏移量,就从日志中的最新消息开始,而不是最早的消息 --group 消费者组 id,默认 perf-consumer-94851 --hide-header 如果设置,就跳过打印统计信息的标题 --messages 要获取的消息数量 --num-fetch-threads 获取消息的线程数量 --print-metrics 打印指标信息 --reporting-interval 打印进度信息的间隔,默认 5000ms --show-detailed-stats 如果设置,将按 --reporting-interval 的间隔打印统计信息 --socket-buffer-size TCP 获取信息的缓存大小 默认 2097152(2M) --threads 处理线程数,默认 10 --timeout 返回记录的超时时间

测试消费 50w 条数据

代码语言:javascript
复制
[root@10 kafka_2.11-2.2.0]# bin/kafka-consumer-perf-test.sh --topic first --broker-list 10.211.55.3:9092 --messages 500000  --timeout 300000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-05-30 01:21:27:072, 2019-05-30 01:21:30:801, 488.6162, 131.0314, 500343, 134176.1866, 25, 3704, 131.9158, 135081.8035

测试结果为:共消费 488.6162MB 数据,每秒消费 131.0314MB, 共消费 500343 条数据,每秒消费 134176.1866 条

Kafka生产者端优化

测试环境虚拟机 CPU:2 核 RAM:2G Kafka Topic 为 1 分区,1 副本

Kafka 生产者端发送延迟优化

batch.size

batch.size 单位为字节,为了方便这里都表示为kb 默认配置,batch.size=16kb

代码语言:javascript
复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
249892 records sent, 49978.4 records/sec (48.81 MB/sec), 153.6 ms avg latency, 537.0 ms max latency.
250193 records sent, 50038.6 records/sec (48.87 MB/sec), 1.4 ms avg latency, 12.0 ms max latency.
211747 records sent, 42349.4 records/sec (41.36 MB/sec), 194.3 ms avg latency, 1106.0 ms max latency.
1000000 records sent, 49972.515117 records/sec (48.80 MB/sec), 119.65 ms avg latency, 1106.00 ms max latency, 2 ms 50th, 488 ms 95th, 1043 ms 99th, 1102 ms 99.9th.

结果显示平均延迟有 456.94 ms,最高延迟 5308.00 ms 现在我要降低最高延迟数,batch.size 的意思是 ProducerBatch 的内存区域充满后,消息就会被立即发送,那我们把值改小看看 batch.size=8kb

代码语言:javascript
复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
148553 records sent, 29710.6 records/sec (29.01 MB/sec), 812.4 ms avg latency, 1032.0 ms max latency.
195468 records sent, 39093.6 records/sec (38.18 MB/sec), 735.9 ms avg latency, 907.0 ms max latency.
189700 records sent, 37940.0 records/sec (37.05 MB/sec), 763.4 ms avg latency, 1053.0 ms max latency.
208418 records sent, 41683.6 records/sec (40.71 MB/sec), 689.7 ms avg latency, 923.0 ms max latency.
196504 records sent, 39300.8 records/sec (38.38 MB/sec), 718.1 ms avg latency, 1056.0 ms max latency.
1000000 records sent, 37608.123355 records/sec (36.73 MB/sec), 741.56 ms avg latency, 1056.00 ms max latency, 725 ms 50th, 937 ms 95th, 1029 ms 99th, 1051 ms 99.9th.

但经过测试发现,延迟反而很高,连设定的 50000 吞吐量都达不到,原因应该是这样:batch.size 小了,消息很快就会充满,这样消息就会被立即发送的服务端,但这样的话发送的次数就变多了,但由于网络原因是不可控的,有时候网络发生抖动就会造成较高的延迟 那就改大看看。 batch.size=32kb

代码语言:javascript
复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
249852 records sent, 49970.4 records/sec (48.80 MB/sec), 88.8 ms avg latency, 492.0 ms max latency.
250143 records sent, 50028.6 records/sec (48.86 MB/sec), 1.2 ms avg latency, 15.0 ms max latency.
250007 records sent, 49991.4 records/sec (48.82 MB/sec), 1.2 ms avg latency, 17.0 ms max latency.
1000000 records sent, 49952.545082 records/sec (48.78 MB/sec), 31.07 ms avg latency, 492.00 ms max latency, 1 ms 50th, 305 ms 95th, 440 ms 99th, 486 ms 99.9th.

测试后,平均延迟,最高延迟都降下来很多,而且比默认值延迟都要小很多,那再改大延迟还会降低吗 batch.size=50kb

代码语言:javascript
复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
249902 records sent, 49970.4 records/sec (48.80 MB/sec), 27.3 ms avg latency, 219.0 ms max latency.
250200 records sent, 50030.0 records/sec (48.86 MB/sec), 1.2 ms avg latency, 8.0 ms max latency.
250098 records sent, 50019.6 records/sec (48.85 MB/sec), 18.6 ms avg latency, 288.0 ms max latency.
242327 records sent, 48407.3 records/sec (47.27 MB/sec), 121.3 ms avg latency, 920.0 ms max latency.
1000000 records sent, 49823.127896 records/sec (48.66 MB/sec), 41.98 ms avg latency, 920.00 ms max latency, 1 ms 50th, 221 ms 95th, 792 ms 99th, 910 ms 99.9th.

如上测试在不同的机器上结果会有不同,但总体的变化曲线是一样的,成 U 型变化

batch.size 代码实现

Kafka 客户端有一个 RecordAccumulator 类,叫做消息记录池,内部有一个 BufferPool 内存区域

代码语言:javascript
复制
RecordAccumulator(LogContext logContext,
                             int batchSize,
                             CompressionType compression,
                             int lingerMs,
                             long retryBackoffMs,
                             int deliveryTimeoutMs,
                             Metrics metrics,
                             String metricGrpName,
                             Time time,
                             ApiVersions apiVersions,
                             TransactionManager transactionManager,
                             BufferPool bufferPool)

当该判断为 true,消息就会被发送

代码语言:javascript
复制
if (result.batchIsFull || result.newBatchCreated) {
   log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
   this.sender.wakeup();
}

max.in.flight.requests.per.connection

该参数可以在一个 connection 中发送多个请求,叫作一个 flight, 这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认 5

在 batch.size=100kb 的基础上,增加该参数值到 10,看看效果

代码语言:javascript
复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two   --record-size 1024 --num-records 1000000  --throughput 50000
249902 records sent, 49960.4 records/sec (48.79 MB/sec), 16.1 ms avg latency, 185.0 ms max latency.
250148 records sent, 50019.6 records/sec (48.85 MB/sec), 1.3 ms avg latency, 14.0 ms max latency.
239585 records sent, 47917.0 records/sec (46.79 MB/sec), 6.4 ms avg latency, 226.0 ms max latency.
1000000 records sent, 49960.031974 records/sec (48.79 MB/sec), 9.83 ms avg latency, 226.00 ms max latency, 1 ms 50th, 83 ms 95th, 182 ms 99th, 219 ms 99.9th.

多次测试结果延迟都比原来降低了 10 倍多,效果还是很明显的 但物极必反,如果你再调大后,效果就不明显了,最终延迟反而变高,这个 batch.size 道理是一样的

compression.type

指定消息的压缩方式,默认不压缩

在原来 batch.size=100kb,max.in.flight.requests.per.connection=10 的基础上,设置 compression.type=gzip 看看延迟是否还可以降低

代码语言:javascript
复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two   --record-size 1024 --num-records 1000000  --throughput 50000
249785 records sent, 49957.0 records/sec (48.79 MB/sec), 2.5 ms avg latency, 199.0 ms max latency.
250091 records sent, 50008.2 records/sec (48.84 MB/sec), 1.9 ms avg latency, 17.0 ms max latency.
250123 records sent, 50024.6 records/sec (48.85 MB/sec), 1.5 ms avg latency, 18.0 ms max latency.
1000000 records sent, 49960.031974 records/sec (48.79 MB/sec), 1.89 ms avg latency, 199.00 ms max latency, 2 ms 50th, 4 ms 95th, 6 ms 99th, 18 ms 99.9th.

测试结果发现延迟又降低了,是不是感觉很强大😁

acks

指定分区中必须有多少个副本收到这条消息,才算消息发送成功,默认值 1 如果配置 acks=0 还能降低一点点延迟,就是不等待 broker 返回是否成功,发出去就完了

代码语言:javascript
复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two   --record-size 1024 --num-records 1000000  --throughput 50000
249919 records sent, 49963.8 records/sec (48.79 MB/sec), 1.4 ms avg latency, 179.0 ms max latency.
250157 records sent, 50021.4 records/sec (48.85 MB/sec), 1.2 ms avg latency, 10.0 ms max latency.
250228 records sent, 50015.6 records/sec (48.84 MB/sec), 0.9 ms avg latency, 8.0 ms max latency.
1000000 records sent, 49967.521111 records/sec (48.80 MB/sec), 1.09 ms avg latency, 179.00 ms max latency, 1 ms 50th, 3 ms 95th, 4 ms 99th, 6 ms 99.9th.

通过测试上面几个参数,如果只配置其中一个,compression.type=gzip 效果是最好的

代码语言:javascript
复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two   --record-size 1024 --num-records 1000000  --throughput 50000
249882 records sent, 49956.4 records/sec (48.79 MB/sec), 11.9 ms avg latency, 191.0 ms max latency.
248708 records sent, 49731.7 records/sec (48.57 MB/sec), 2.9 ms avg latency, 92.0 ms max latency.
251380 records sent, 50276.0 records/sec (49.10 MB/sec), 2.0 ms avg latency, 23.0 ms max latency.
249980 records sent, 49996.0 records/sec (48.82 MB/sec), 1.5 ms avg latency, 18.0 ms max latency.
1000000 records sent, 49960.031974 records/sec (48.79 MB/sec), 4.55 ms avg latency, 191.00 ms max latency, 2 ms 50th, 12 ms 95th, 88 ms 99th, 163 ms 99.9th.

在当前环境下,平均延迟能只有 4.55ms, 最大延迟 191ms

如上测试是在单机1分区,1副本的情况下的,为了能看到效果,延迟只是一个指标,但实际中并不是一味追求某个指标,还需要综合考虑,比如低延迟下,还要提高吞吐量,这就会要牺牲一部分的低延迟。不同的优化点,需要调整不同的参数,具体参数可以见 https://dwz.cn/Sl5L3zoq 另外: 如果 Topic 是多分区,也有显著效果,如果还需要降低延迟,可以再通过如上的参数进行优化

比如在当前环境下,我现在要达到 10w 的吞吐量,默认配置下是达不到的

代码语言:javascript
复制
[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two    --record-size 1024 --num-records 1000000  --throughput 100000
1 records sent, 0.1 records/sec (0.00 MB/sec), 7194.0 ms avg latency, 7194.0 ms max latency.
91167 records sent, 3306.3 records/sec (3.23 MB/sec), 519.4 ms avg latency, 26096.0 ms max latency.
330075 records sent, 66015.0 records/sec (64.47 MB/sec), 2843.5 ms avg latency, 26106.0 ms max latency.
227535 records sent, 45507.0 records/sec (44.44 MB/sec), 556.2 ms avg latency, 2306.0 ms max latency.
236940 records sent, 38577.0 records/sec (37.67 MB/sec), 522.0 ms avg latency, 3439.0 ms max latency.
1000000 records sent, 18762.078088 records/sec (18.32 MB/sec), 1402.18 ms avg latency, 26106.00 ms max latency, 443 ms 50th, 4018 ms 95th, 26073 ms 99th, 26095 ms 99.9th.

通过这几个配置 batch.size=204800 compression.type=gzip  就近乎达到了 10w 的吞吐量

代码语言:javascript
复制
C[root@10 kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic tw   --record-size 1024 --num-records 2000000  --throughput 100000
397998 records sent, 79599.6 records/sec (77.73 MB/sec), 3.4 ms avg latency, 193.0 ms max latency.
489610 records sent, 97922.0 records/sec (95.63 MB/sec), 2.5 ms avg latency, 24.0 ms max latency.
522791 records sent, 104558.2 records/sec (102.11 MB/sec), 1.8 ms avg latency, 29.0 ms max latency.
485255 records sent, 96973.4 records/sec (94.70 MB/sec), 1.8 ms avg latency, 26.0 ms max latency.
2000000 records sent, 94665.593790 records/sec (92.45 MB/sec), 2.31 ms avg latency, 193.00 ms max latency, 2 ms 50th, 5 ms 95th, 12 ms 99th, 23 ms 99.9th.

 到这里Kafka的所有配置上的性能优化到此就结束了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-09-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka服务端脚本详解(1)-topics
    • kafka-topics.sh
      • connect-distributed.sh & connect-standalone.sh
      •  Kafka服务端脚本详解(2)一log,verifiable
        • kafka-log-dirs.sh
          • kafka-verifiable-consumer.sh
            • kafka-verifiable-producer.sh
            •  Kafka服务端脚本详解(3)-性能测试脚本
              •  kafka-producer-perf-test.sh
                • kafka-consumer-perf-test.sh
                • Kafka生产者端优化
                  • Kafka 生产者端发送延迟优化
                  相关产品与服务
                  批量计算
                  批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档