专栏首页简单聊聊SparkKafka系列8:一网打尽常用脚本及配置,宜收藏落灰!

Kafka系列8:一网打尽常用脚本及配置,宜收藏落灰!

​前言

通过前面 7 篇文章的介绍,小伙伴们应该对 Kafka 运行工作原理有一个相对比较清晰的认识了。为了提高平时的工作效率,帮助我们快速定位一些线上问题,比如查看部分 Partition 堆积机器 IP 等操作,这篇文章总结了一些平时常用到的一些 Kafka 命令及常用配置,方便日后查阅(该文章中提到的相关配置会持续更新)。

文章概览

1. 常用脚本及命令总结。

2. 常用配置及说明。

常用命令总结

一. kafka-topic.sh 脚本相关常用命令,主要操作 Topic。

创建名字为 "op_log" 的 Topic。

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 3 --topic op_log

查看指定 ZK 管理的 Topic 列表

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

查看指定 Topic 的详细信息,包括 Partition 个数,副本数,ISR 信息

$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe op_log

修改指定 Topic 的 Partition 个数。注意:该命令只能增加 Partition 的个数,不能减少 Partition 个数,当修改的 Partition 个数小于当前的个数,会报如下错误:Error while executing topic command : The number of partitions for a topic can only be increased

$ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic op_log1 --partition 4

删除名字为 "op_log" 的 Topic。注意如果直接执行该命令,而没有设置 delete.topic.enable 属性为 true 时,是不会立即执行删除操作的,而是仅仅将指定的 Topic 标记为删除状态,之后会启动后台删除线程来删除。

$ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic op_log

二. kafka-consumer-groups.sh 脚本常用命令,主要用于操作消费组相关的。

查看消费端的所有消费组信息(ZK 维护消费信息)

$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

查看消费端的所有消费组信息(Kafka 维护消费信息)

$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

查看指定 group 的详细消费情况,包括当前消费的进度,消息积压量等等信息(ZK 维护消息信息)

$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-1291 --describe

查看指定 group 的详细消费情况,包括当前消费的进度,消息积压量等等信息(Kafka 维护消费信息)

$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group mygroup --describe

三. kafka-consumer-offset-checker.sh 脚本常用命令,用于检查 OffSet 相关信息。(注意:该脚本在 0.9 以后可以使用 kafka-consumer-groups.sh 脚本代替,官方已经标注为 deprecated 了)

查看指定 group 的 OffSet 信息、所有者等信息

$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group console-consumer-1291

查看指定 group 的 OffSet 信息,包括消费者所在的 IP 信息等等

$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group console-consumer-1291 --broker-info

四. kafka-configs.sh 脚本常用命令,该脚本主要用于增加/修改 Kafka 集群的配置信息。

对指定 entry 添加配置,其中 entry 可以是 topics、clients;如下,给指定的 topic 添加两个属性,分别为 max.message.byte、flush.message。

$bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mytopic --add-config'max.message.bytes=50000000,flush.message=5'

对指定 entry 删除配置,其中 entry 可以是 topics、clients;如下。对指定的 topic 删除 flush.message 属性。

$bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mytopic --delete-config'flush.message'

查看指定 topic 的配置项。

$ bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name mytopic --describe

五. kafka-reassign-partitions.sh 脚本相关常用命令,主要操作 Partition 的负载情况。

生成 partition 的移动计划,--broker-list 参数指定要挪动的 Broker 的范围,--topics-to-move-json-file 参数指定 Json 配置文件

$bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --broker-list"0"--topics-to-move-json-file ~/json/op_log_topic-to-move.json --generate

执行 Partition 的移动计划,--reassignment-json-file 参数指定 RePartition 后 Assigned Replica 的分布情况。

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/json/op_log_reassignment.json --execute

检查当前 rePartition 的进度情况。

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/json/op_log_reassignment.json --verify

常用配置及说明

kafka 常见重要配置说明,分为四部分

Broker Config:kafka 服务端的配置

Producer Config:生产端的配置

Consumer Config:消费端的配置

Kafka Connect Config:kafka 连接相关的配置

Broker Config

zookeeper.connect

连接 zookeeper 集群的地址,用于将 kafka 集群相关的元数据信息存储到指定的 zookeeper 集群中

advertised.port

注册到 zookeeper 中的地址端口信息,在 IaaS 环境中,默认注册到 zookeeper 中的是内网地址,通过该配置指定外网访问的地址及端口,advertised.host.name 和 advertised.port 作用和 advertised.port 差不多,在 0.10.x 之后,直接配置 advertised.port 即可,前两个参数被废弃掉了。

auto.create.topics.enable

自动创建 topic,默认为 true。其作用是当向一个还没有创建的 topic 发送消息时,此时会自动创建一个 topic,并同时设置 -num.partition 1 (partition 个数) 和 default.replication.factor (副本个数,默认为 1) 参数。

一般该参数需要手动关闭,因为自动创建会影响 topic 的管理,我们可以通过 kafka-topic.sh 脚本手动创建 topic,通常也是建议使用这种方式创建 topic。在 0.10.x 之后提供了 kafka-admin 包,可以使用其来创建 topic。

auto.leader.rebalance.enable

自动 rebalance,默认为 true。其作用是通过后台线程定期扫描检查,在一定条件下触发重新 leader 选举;在生产环境中,不建议开启,因为替换 leader 在性能上没有什么提升。

background.threads

后台线程数,默认为 10。用于后台操作的线程,可以不用改动。

broker.id

Broker 的唯一标识,用于区分不同的 Broker。kafka 的检查就是基于此 id 是否在 zookeeper 中/brokers/ids 目录下是否有相应的 id 目录来判定 Broker 是否健康。

compression.type

压缩类型。此配置可以接受的压缩类型有 gzip、snappy、lz4。另外可以不设置,即保持和生产端相同的压缩格式。

delete.topic.enable

启用删除 topic。如果关闭,则无法使用 admin 工具进行 topic 的删除操作。

leader.imbalance.check.interval.seconds

partition 检查重新 rebalance 的周期时间

leader.imbalance.per.broker.percentage

标识每个 Broker 失去平衡的比率,如果超过改比率,则执行重新选举 Broker 的 leader

log.dir / log.dirs

保存 kafka 日志数据的位置。如果 log.dirs 没有设置,则使用 log.dir 指定的目录进行日志数据存储。

log.flush.interval.messages

partition 分区的数据量达到指定大小时,对数据进行一次刷盘操作。比如设置了 1024k 大小,当 partition 积累的数据量达到这个数值时则将数据写入到磁盘上。

log.flush.interval.ms

数据写入磁盘时间间隔,即内存中的数据保留多久就持久化一次,如果没有设置,则使用 log.flush.scheduler.interval.ms 参数指定的值。

log.retention.bytes

表示 topic 的容量达到指定大小时,则对其数据进行清除操作,默认为-1,永远不删除。

log.retention.hours

标示 topic 的数据最长保留多久,单位是小时

log.retention.minutes

表示 topic 的数据最长保留多久,单位是分钟,如果没有设置该参数,则使用 log.retention.hours 参数

log.retention.ms

表示 topic 的数据最长保留多久,单位是毫秒,如果没有设置该参数,则使用 log.retention.minutes 参数

log.roll.hours

新的 segment 创建周期,单位小时。kafka 数据是以 segment 存储的,当周期时间到达时,就创建一个新的 segment 来存储数据。

log.segment.bytessegment 的大小。当 segment 大小达到指定值时,就新创建一个 segment。

message.max.bytes

topic 能够接收的最大文件大小。需要注意的是 producer 和 consumer 端设置的大小需要一致。

min.insync.replicas

最小副本同步个数。当 producer 设置了 request.required.acks 为-1 时,则 topic 的副本数要同步至该参数指定的个数,如果达不到,则 producer 端会产生异常。

num.io.threads

指定 io 操作的线程数

num.network.threads

执行网络操作的线程数

num.recovery.threads.per.data.dir

每个数据目录用于恢复数据的线程数

num.replica.fetchers

从 leader 备份数据的线程数

offset.metadata.max.bytes

允许消费者端保存 offset 的最大个数

offsets.commit.timeout.ms

offset 提交的延迟时间

offsets.topic.replication.factor

topic 的 offset 的备份数量。该参数建议设置更高保证系统更高的可用性

port

端口号,Broker 对外提供访问的端口号。

request.timeout.ms

Broker 接收到请求后的最长等待时间,如果超过设定时间,则会给客户端发送错误信息

zookeeper.connection.timeout.ms

客户端和 zookeeper 建立连接的超时时间,如果没有设置该参数,则使用 zookeeper.session.timeout.ms 值

connections.max.idle.ms

空连接的超时时间。即空闲连接超过该时间时则自动销毁连接。

Producer Config

bootstrap.servers

服务端列表。即接收生产消息的服务端列表

key.serializer

消息键的序列化方式。指定 key 的序列化类型

value.serializer

消息内容的序列化方式。指定 value 的序列化类型

acks

消息写入 Partition 的个数。通常可以设置为 0,1,all;当设置为 0 时,只需要将消息发送完成后就完成消息发送功能;当设置为 1 时,即 Leader Partition 接收到数据并完成落盘;当设置为 all 时,即主从 Partition 都接收到数据并落盘。

buffer.memory

客户端缓存大小。即 Producer 生产的数据量缓存达到指定值时,将缓存数据一次发送的 Broker 上。

compression.type

压缩类型。指定消息发送前的压缩类型,通常有 none, gzip, snappy, or, lz4 四种。不指定时消息默认不压缩。

retries

消息发送失败时重试次数。当该值设置大于 0 时,消息因为网络异常等因素导致消息发送失败进行重新发送的次数。

Consumer Config

bootstrap.servers

服务端列表。即消费端从指定的服务端列表中拉取消息进行消费。

key.deserializer

消息键的反序列化方式。指定 key 的反序列化类型,与序列化时指定的类型相对应。

value.deserializer

消息内容的反序列化方式。指定 value 的反序列化类型,与序列化时指定的类型相对应。

fetch.min.bytes

抓取消息的最小内容。指定每次向服务端拉取的最小消息量。

group.id

消费组中每个消费者的唯一表示。

heartbeat.interval.ms

心跳检查周期。即在周期性的向 group coordinator 发送心跳,当服务端发生 rebalance 时,会将消息发送给各个消费者。该参数值需要小于 session.timeout.ms,通常为后者的 1/3。

max.partition.fetch.bytes

Partition 每次返回的最大数据量大小。

session.timeout.ms

consumer 失效的时间。即 consumer 在指定的时间后,还没有响应则认为该 consumer 已经发生故障了。

auto.offset.reset

当 kafka 中没有初始偏移量或服务器上不存在偏移量时,指定从哪个位置开始消息消息。earliest:指定从头开始;latest:从最新的数据开始消费。

Kafka Connect Config

group.id

消费者在消费组中的唯一标识

internal.key.converter

内部 key 的转换类型。

internal.value.converter

内部 value 的转换类型。

key.converter

服务端接收到 key 时指定的转换类型。

value.converter

服务端接收到 value 时指定的转换类型。

bootstrap.servers

服务端列表。

heartbeat.interval.ms

心跳检测,与 consumer 中指定的配置含义相同。

session.timeout.ms

session 有效时间,与 consumer 中指定的配置含义相同。

总结

本文总结了平时经常用到的一些 Kafka 配置及命令说明,方便随时查看;喜欢的朋友可以收藏以备不时之需。

下篇文章我们来分析一些经常在面试中碰到的问题及相应的解决办法,敬请期待。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Kafka系列文章第2篇之安装测试

    上篇文章讲解了 Kafka 的基础概念和架构,了解了基本概念之后,必须得实践一波了,所谓“实践才是检验真理的唯一办法”,后续系列关于 Kafka 的文章都以 k...

    z小赵
  • spark-1.3.0的编译及三种运行模式的配置

    最近安装spark1.3.0并与Hadoop2.5.0集成,在Spark的历史资源管理中心没有找到对应的版本,而且在网上也没有找到对应版本的编译教程,于是只能根...

    z小赵
  • Spark内核分析之Master的注册机制实现原理

            这篇文章我们来讨论一下Master的注册机制;那么有哪些信息需要注册到Master上面去呢?很简单,分别有Worker的注册,Driver的注册...

    z小赵
  • python 操作kafka

    最近项目中总是跟java配合,我一个写python的程序员,面对有复杂数据结构的java代码转换成python代码,确实是一大难题,有时候或多或少会留有一点坑...

    py3study
  • 视频追踪之目标选择(一)------边缘检测值程序

    const char *pstrWindowsBinaryTitle = "二值图";

    钱塘小甲子
  • python学习之GUI(pygame)

    py3study
  • Pygame常用方法

    skylark
  • 【MYSQL函数】MYSQL中IF函数在where中的使用

    WHERE IF(条件, true执行条件, false执行条件) 业务需求: 查询SUPPLIER_CLASS=0 and tp1.`TYPE_FLAG...

    用户5640963
  • 服务发现框架选型: Consul、Zookeeper还是etcd ?

    本文并不介绍服务发现的基本原理。除了一致性算法之外,其他并没有太多高深的算法,网上的资料很容易让大家明白上面是服务发现。 想直接查看结论的同学,请直接跳到文末。...

    sunsky
  • SAP Hybris Commerce帮助文档的结构

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明...

    Jerry Wang

扫码关注云+社区

领取腾讯云代金券