9.kafka shell脚本用法详解

kafka安装目录下的bin目录包含了很多运维可操作的shell脚本,列举如下:

接下来详细说明每个脚本的使用方法。

connect-distributed.sh&connect-standalone.sh

Kafka Connect是在0.9以后加入的功能,主要是用来将其他系统的数据导入到Kafka,然后再将Kafka中的数据导出到另外的系统。可以用来做实时数据同步的ETL,数据实时分析处理等。

主要有2种模式:Standalone(单机模式)和Distribute(分布式模式)。

单机主要用来开发,测试,分布式的用于生产环境。

用法比较复杂,建议参考:Kafka Connect教程详解 https://3gods.com/bigdata/Kafka-Connect-Details.html

kafka-broker-api-versions.sh

用法:bin/kafka-broker-api-versions.sh --bootstrap-server 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094

kafka-configs.sh

配置管理脚本,这个脚本主要分两类用法:describe和alter。这个脚本主要用于显示和修改覆盖配置,即覆盖默认配置。

describe相关用法:

查看每个topic的配置:bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics

部分结果如下:

查看broker的配置:bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers --entity-name 0

说明:0是broker.id,因为entity-type为brokers,所以entity-name表示broker.id。

alter相关用法:

给指定topic增加配置:bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name TOPIC-TEST-AFEI--add-configretention.ms=600000

给指定topic删除配置:bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name TOPIC-TEST-AFEI--delete-configmax.message.bytes

通过该脚本可以管理的属性,可以通过执行得到的输出中的desc可以得到。

kafka-broker-api-versions.sh

用法:bin/kafka-broker-api-versions.sh --bootstrap-server localhost:9092

kafka-console-consumer.sh

用法:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic afei [--group groupName] [--partition 目标分区]

这个命令后面还可带很多参数:

:指定key的反序列化方式,默认是 org.apache.kafka.common.serialization.StringDeserializer

:指定value的反序列化方式,默认是 org.apache.kafka.common.serialization.StringDeserializer

:从最早的消息开始消费,默认是从最新消息开始消费。

: 从指定的消息位置开始消费,如果设置了这个参数,还需要带上。否则会提示:The partition is required when offset is specified.

:当消费者在这个参数指定时间间隔内没有收到消息就会推出,并抛出异常:kafka.consumer.ConsumerTimeoutException。

:接收的topic白名单集合,和二者取其一。例如:(以afei开头的topic),(指定afei这个topic),(指定afei或者fly这两个topic)。另外一个参数用法类似。

kafka-console-producer.sh

用法:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic afei

,如果连接集群,那么broker-list参数格式为:HOST1:PORT1,HOST2:PORT2,HOST3:PORT3

kafka-consumer-groups.sh

查看所有消费者组:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

查看某个消费者组:bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group AfeiGroup --describe,输出结果如下:

输出结果列中LAG表示消费者当前offset和broker中LOG-END-OFFSET之间的差距,理想结果是0,表水没有任何延迟。如果这个值比较大,需要注意。

kafka-consumer-perf-test.sh

perf是performance的缩写,所以这个脚本是kafka消费者性能测试脚本。

用法:bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --group testGroup --topic afei --messages 1024

输出结果如下:

kafka-delete-records.sh

用法:bin/kafka-delete-records.sh --bootstrap-server 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --offset-json-file offset.json,offset.json文件内容:

执行结果如下,表示删除afei这个topic下分区为3的offset少于10的消息日志(不会删除offset=10的消息日志):

kafka-log-dirs.sh

用法:bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --topic-list afei[,topicName2,topicNameN],如果没有指定,那么会输出所有kafka消息日志目录以及目录下所有topic信息。加上参数后,输出结果如下,由这段结果可知,消息日志所在目录为,并且afei这个topic有3个分区:

kafka-preferred-replica-election.sh

用法:bin/kafka-preferred-replica-election.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --path-to-json-file afei-preferred.json(如果不带--path-to-json-file就是对所有topic进行preferred replica election),json文件内容如下::

场景:在创建一个topic时,kafka尽量将partition均分在所有的brokers上,并且将replicas也均分在不同的broker上。每个partitiion的所有replicas叫做"assigned replicas","assigned replicas"中的第一个replicas叫"preferred replica",刚创建的topic一般"preferred replica"是leader。leader replica负责所有的读写。其他replica只是冷备状态,不接受读写请求。但随着时间推移,broker可能会主动停机甚至客观宕机,会发生leader选举迁移,导致机群的负载不均衡。我们期望对topic的leader进行重新负载均衡,让partition选择"preferred replica"做为leader。

kafka提供了一个参数自动做这件事情,且默认为true,原理是一个后台线程检查并触发leader balance。但是并不建议把这个参数设置为true。因为担心这个自动选举发生在业务高峰期,从而导致影响业务。

验证:

操作比较简单,常见一个3个分区3个副本的topic,然后kill掉一个broker。这时候topic信息如下,我们可以看到broker.id为0的broker上有两个leader:

执行脚本后,topic信息如下,leader均匀分布在3个不同的broker上,

kafka-producer-perf-test.sh

perf是performance的缩写,所以这个脚本是kafka生产者性能测试脚本。

kafka-reassign-partitions.sh

场景:将一些topic上的分区从当前所在broker移到其他比如新增的broker上。假设有个名为ORDER-DETAIL的topic,在broker.id为2的broker上:

现在想要把它移动到broker.id为1的broker上,执行脚本:bin/kafka-reassign-partitions.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --topics-to-move-json-file move.json --broker-list "1" --generate

参数表示生成一个分区再分配配置,并不会真正的执行,命令执行结果如下:

我们只需要把第二段json内容保存到一个新建的final.json文件中(如果知道如何编写这段json内容,那么也可以不执行第一条命令),然后执行命令:bin/kafka-reassign-partitions.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --reassignment-json-file move_final.json --execute,此次执行的命令带有参数,说明是真正的执行分区重分配。

通过这个命令还可以给某个topic增加副本,例如有一个名为ORDER-DETAIL的topic,有3个分区,但是只有1个副本,为了高可用,需要将副本数增加到2,那么编写replica.json文本内容如下:

然后执行命令即可:bin/kafka-reassign-partitions.sh --zookeeper 10.0.55.208:2181/wallet,10.0.55.209:2181/wallet,10.0.55.210:2181/wallet --reassignment-json-file replica.json

注意:分区重分配对集群的性能有比较大的影响,因为它会使内存页缓存发生变化,并且占用额外的网络和磁盘资源。将重分配过程拆分成几个小过程可以降低影响。

kafka-replica-verification.sh

用法:bin/kafka-replica-verification.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 [--topic-white-list afei],参数--topic-white-list指定要检查的目标topic。输出结果如下,如果输出信息为表示这个topic的复制没有任何延迟:

这个脚本使用时也要注意,会对集群造成一定影响,因为它需要读取所有消息。

kafka-server-start.sh

用法:bin/kafka-server-start.sh -daemon config/server.properties,指定配置文件并以守护进程模式启动。

kafka-server-stop.sh

用法:bin/kafka-server-stop.sh 。说明,这个命令会kill掉当前服务器上所有kafka broker。但是这个脚本可能执行结果为:

分析原因:我们先看一下脚本内容,这个脚本非常简单,就是得到所有包含kafka.Kafka的进程ID,但是由于kafka启动依赖比较多的jar,导致kafka进程的结果输出内容比较长,而输出结果受到(其值通过命令可以得到)的限制,从而导致结果中看不到,所以不能kill掉kafka server:

为了kafka-server-stop.sh脚本可以正常执行,建议修改脚本如下,通过bin脚本所在目录的上级目录来查找进程ID,从而kill相关进程:

kafka-simple-consumer-shell.sh

deprecated,用法:bin/kafka-simple-consumer-shell.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --topic afei

kafka-topics.sh

创建topic:bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic afei --partitions 3 --replication-factor 1

删除topic: bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test,broker的delete.topic.enable一定要是true才能成功删除topic,否则删除命令会被忽视。

修改topic: bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic afei --partitions 5,修改topic时只能增加分区数量。

查询topic: bin/kafka-topics.sh --zookeeper localhost:2181 --describe [ --topic afei ],查询时如果带上,那么表示只查询该topic的详细信息。这时候还可以带上 和任意一个参数。

说明:如果某些topic为了有序发送消息时会基于key,那么增加分区数量会导致key和分区的映射关系发生变化。如果这个影响不能接受,那么基于key的topic最好一开始就评估分区数量,将来尽量避免调整。

kafka-verifiable-consumer.sh

用法:bin/kafka-verifiable-consumer.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --topic afei --group-id groupName

这个脚本的作用是接收指定topic的消息消费,并发出消费者事件,例如:offset提交等。

kafka-verifiable-producer.sh

用法:bin/kafka-verifiable-producer.sh --broker-list 10.0.55.229:9092,10.0.55.229:9093,10.0.55.229:9094 --topic afei [--max-messages 64],建议使用该脚本时增加参数,否则会不停的发送消息。

这个脚本的作用是持续发送消息到指定的topic中,参数限制最大发送消息数。且每条发送的消息都会有响应信息,这就是和最大的不同:

afei这个topic有3个分区,使用kafka-verifiable-producer.sh发送9条消息。根据输出结果可以看出,往每个分区发送了3条消息。另外,我们可以通过设置参数一个比较大的值,可以压测一下搭建的kafka集群环境。

zookeeper-shell.sh

用法:bin/zookeeper-shell.sh localhost:2181[/path],如果kafka集群的zk配置了chroot路径,那么需要加上/path,例如,登陆zk后,就可以查看kafka写在zk上的节点信息。例如查看有哪些broker,以及broker的详细信息:

写在最后

上面的这些kafka运维脚本,有些是指定参数--zookeeper,有些是指定参数--broker-list,有些是指定参数--bootstrap-server。

这实际上是历史问题。broker-list代表broker地址,而bootstrap-server代表连接起点,可以从中拉取broker地址信息(前面的[4. kafka生产者&消费者]已经分析过)。bootstrap-server的命名更高级点。还有通过zookeeper连接的,kafka早起很多信息存方在zk中,后期慢慢弱化了zk的作用,这三个参数代表kafka的三个时代。往好的讲是见证kafka设计的理念变迁,往坏的讲:什么**玩意儿,绕的一笔(来自厮大大的解答),哈。

  • 发表于:
  • 原文链接:https://kuaibao.qq.com/s/20180804G08XOR00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券