先说结论,Kafka 部署在 Linux 上要比 Windows 和 Mac 上性能高的多,主要是以下几个原因:
主流的 I/O 模型通常有以下五种类型:
你不必详细了解每一种模型的实现细节,通常情况下我们认为后一种模型会比前一种模型要高级,比如 epoll 就比 select 要好,了解到这一程度应该足以应付我们下面的内容了。
说了这么多,I/O 模型与 Kafka 的关系又是什么呢?实际上 Kafka 客户端底层使用了 Java 的 selector,selector 在 Linux 上的实现机制是 epoll,而在 Windows 平台上的实现机制是 select。因此在这一点上将 Kafka 部署在 Linux 上是有优势的,因为能够获得更高效的 I/O 性能。
Kafka 在 Linux 上支持零拷贝(Zero-copy)的主要原因是 Linux 操作系统提供了一些特性和系统调用,使得零拷贝成为可能。而 Windows 操作系统在设计上与 Linux 有所不同,因此不直接支持零拷贝。
零拷贝是一种优化技术,可以减少数据在内核空间和用户空间之间的拷贝次数,提高数据传输的效率。在传统的拷贝方式中,数据从磁盘读取到内核缓冲区,然后再从内核缓冲区拷贝到用户空间的应用程序缓冲区,这涉及到多次数据拷贝操作,增加了 CPU 和内存的开销。
在 Linux 上,零拷贝的实现主要依赖以下几个特性和系统调用:
通过使用 sendfile 系统调用,Kafka 可以直接将数据从磁盘读取到内核缓冲区,然后通过网络发送给消费者,避免了数据在内核空间和用户空间之间的多次拷贝。
而在 Windows 上,没有类似于 Linux 的 sendfile 系统调用,因此无法直接实现零拷贝。在 Windows 上,数据需要经过内核空间和用户空间之间的多次拷贝,导致性能上的损失。
一句话总结一下,在 Linux 部署 Kafka 能够享受到零拷贝技术所带来的快速数据传输特性。
先说结论:
Kafka 是一个高吞吐量、低延迟的分布式消息系统,它的性能和稳定性对于线上环境非常重要。虽然 SSD(固态硬盘)在性能方面有明显的优势,但机械磁盘仍然可以胜任 Kafka 线上环境的原因如下:
需要注意的是,尽管机械磁盘可以胜任 Kafka 线上环境,但在某些特定场景下,如对延迟要求非常高的应用,或者对存储容量要求非常大的应用,可能需要考虑使用 SSD 等更高性能的存储设备。此外,随着技术的发展,未来可能会有更多新型存储设备出现,对于 Kafka 的存储需求可能会有不同的选择。
总结下磁盘容量需要考虑的因素:
我举一个简单的例子来说明该如何思考这个问题。假设你所在公司有个业务每天需要向 Kafka 集群发送 1 亿条消息,每条消息保存两份以防止数据丢失,另外消息默认保存两周时间。现在假设消息的平均大小是 1KB,那么你能说出你的 Kafka 集群需要为这个业务预留多少磁盘空间吗?
我们来计算一下:每天 1 亿条 1KB 大小的消息,保存两份且留存两周的时间,那么总的空间大小就等于 1 亿 * 1KB * 2 / 1000 / 1000 = 200GB
。一般情况下 Kafka 集群除了消息数据还有其他类型的数据,比如索引数据等,故我们再为这些数据预留出 10% 的磁盘空间,因此总的存储容量就是 220GB。既然要保存两周,那么整体容量即为 220GB _ 14,大约 3TB 左右。Kafka 支持数据的压缩,假设压缩比是 0.75,那么最后你需要规划的存储空间就是 0.75 _ 3 = 2.25TB
。
举个例子,我们使用以下假设和计算:
根据以上计算,我们得出了需要 10 台 Kafka 服务器来完成这个业务目标。这个计算还没有考虑到消息的复制,如果消息需要额外复制两份,那么总的服务器台数还要乘以 3,即需要 30 台服务器。
在实际部署中,你可以根据自己的网络环境和业务需求进行调整和优化。
另外,如果你的环境中还涉及跨机房传输,那么带宽资源的瓶颈可能会更加明显。在跨机房传输的情况下,网络延迟和带宽限制都会对性能产生影响。你可能需要考虑使用更高带宽的网络或者采取其他优化措施来解决这个问题。
总结起来,对于带宽资源的规划,你需要考虑以下几个因素:
根据以上因素,你可以计算出所需的 Kafka 服务器数量,并根据实际情况进行调整和优化。
静态参数是指在 Kafka 启动时配置的参数,一旦设置后,只能通过重启 Kafka 来更改。这些参数通常是对 Kafka 整体行为的全局设置,例如 Kafka 的监听端口、日志目录、副本数量等。静态参数的配置通常在 Kafka 的配置文件(如 server.properties)中进行。
动态参数是指在 Kafka 运行时可以动态修改的参数,而无需重启 Kafka。这些参数通常是对 Kafka 的某个特定组件或功能进行细粒度的调整。动态参数可以通过 Kafka 的命令行工具或 API 进行修改。
在 Kafka 中,Broker 是消息队列的核心组件,负责接收、存储和转发消息。为了配置存储信息,我们需要设置一些重要的参数。
/home/kafka1,/home/kafka2,/home/kafka3
。如果有条件的话,最好将这些目录挂载到不同的物理磁盘上,以提高性能和可靠性。
为什么要为 log.dirs 配置多个路径呢?这是因为多块物理磁盘同时读写数据可以提高吞吐量,同时也能实现故障转移。在 Kafka 1.1 版本之前,如果 Broker 使用的任何一块磁盘挂掉了,整个 Broker 进程都会关闭。但是从 Kafka 1.1 版本开始,引入了 Failover 功能,坏掉的磁盘上的数据会自动转移到其他正常的磁盘上,Broker 仍然可以正常工作。这个改进使得我们不再依赖 RAID 来提供数据的可靠性,而是通过多块磁盘的故障转移来实现。
需要注意的是,如果使用了多个路径,Kafka 会根据一定的策略将消息分配到不同的路径上,以实现负载均衡。同时,Kafka 也会自动管理磁盘空间,当某个路径的磁盘空间不足时,会自动将消息转移到其他路径上。
总结一下,为了配置存储信息,我们需要设置 log.dirs 参数,为其配置多个路径,最好挂载到不同的物理磁盘上。这样可以提高读写性能和实现故障转移。同时,Kafka 会自动管理磁盘空间和实现负载均衡。这些配置可以在 Kafka 的配置文件中进行设置。
ZooKeeper 是一个分布式协调框架,用于协调和管理 Kafka 集群的元数据信息。它负责保存 Kafka 集群的配置信息,例如 Broker 的运行状态、Topic 的创建情况、分区信息以及 Leader 副本的位置等。
在 Kafka 中,与 ZooKeeper 相关的最重要的参数是zookeeper.connect
。这个参数是一个 CSV 格式的字符串,用于指定连接到 ZooKeeper 集群的地址和端口。例如,zk1:2181,zk2:2181,zk3:2181
表示连接到三个 ZooKeeper 节点,默认端口为 2181。
如果要让多个 Kafka 集群共享同一个 ZooKeeper 集群,可以使用chroot
参数来进行区分。chroot
是 ZooKeeper 的概念,类似于别名。假设有两个 Kafka 集群,分别命名为 kafka1 和 kafka2,那么可以将zookeeper.connect
参数设置为zk1:2181,zk2:2181,zk3:2181/kafka1
和zk1:2181,zk2:2181,zk3:2181/kafka2
。这样就可以通过chroot
来区分不同的 Kafka 集群。
需要注意的是,chroot
只需要在参数中指定一次,并且应该添加到最后。有时候会遇到这样的错误格式:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3
,这是不正确的。
总结一下,设置与 ZooKeeper 相关的参数时,需要注意以下几点:
zookeeper.connect
参数是一个 CSV 格式的字符串,用于指定连接到 ZooKeeper 集群的地址和端口。 chroot
参数来进行区分。 chroot
参数只需要在参数中指定一次,并且应该添加到最后。 下面是一个示例配置:
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka1
这个配置表示连接到三个 ZooKeeper 节点,并使用kafka1
作为chroot
。
在 Kafka 中,listeners 参数用于指定外部连接者通过什么协议访问 Kafka 服务。它是一个逗号分隔的三元组列表,每个三元组由协议名称、主机名和端口号组成。协议名称可以是标准的协议,如 PLAINTEXT 表示明文传输,SSL 表示使用 SSL 或 TLS 加密传输等,也可以是自定义的协议名称。
举个例子,如果你定义了一个名为 CONTROLLER 的自定义协议,你可以在 listeners 参数中添加 CONTROLLER://localhost:9092
,表示该协议通过 localhost 的 9092 端口进行通信。
需要注意的是,如果你自定义了协议名称,你还需要通过 listener.security.protocol.map
参数告诉 Kafka 使用哪种安全协议。比如,如果你定义了 CONTROLLER 协议,并且该协议使用明文传输数据,你需要设置 listener.security.protocol.map=CONTROLLER:PLAINTEXT
。
另外,主机名和端口号比较直观,不需要过多解释。但是需要注意的是,建议在 Broker 端和客户端应用的配置中都使用主机名而不是 IP 地址。因为在 Kafka 的源代码中,也是使用主机名进行连接的。如果你在某些地方使用了 IP 地址进行连接,可能会导致连接失败的问题。
总结一下,listeners 参数用于指定 Kafka 服务的监听器,告诉外部连接者通过什么协议访问 Kafka。它是一个三元组列表,每个三元组由协议名称、主机名和端口号组成。建议使用主机名而不是 IP 地址进行配置。如果使用自定义协议,还需要通过 listener.security.protocol.map
参数指定安全协议。
该参数用于控制是否允许自动创建 Topic。建议将该参数设置为 false,即不允许自动创建 Topic。
在线上环境中,如果该参数被设置为 true,可能会导致出现很多名字稀奇古怪的 Topic。例如,当我们想要为名为 test 的 Topic 发送事件时,由于拼写错误将 test 写成了 tst,启动生产者程序后,一个名为 tst 的 Topic 就会被自动创建。这种情况下,好的运维应该防止这种情况的发生,特别是对于大公司而言,每个部门被分配的 Topic 应该由运维严格把控,不允许自行创建任何 Topic。
该参数用于控制是否允许 Unclean Leader 选举。Unclean Leader 选举是指在 Kafka 中,当保存数据较多的副本都挂掉时,是否允许从保存数据较少的副本中选举出新的 Leader。
在 Kafka 中,每个分区都有多个副本来提供高可用性,其中只有一个副本对外提供服务,即 Leader 副本。只有保存数据较多的副本才有资格竞选 Leader,而那些落后进度太多的副本没有资格竞选。
如果设置unclean.leader.election.enable
为 false,那么 Kafka 将坚持之前的原则,坚决不允许那些落后太多的副本竞选 Leader。这样做的后果是该分区将不可用,因为没有 Leader。
如果设置unclean.leader.election.enable
为 true,那么 Kafka 允许从那些保存数据较少的副本中选举出新的 Leader。这样做的后果是数据有可能丢失,因为这些副本保存的数据本来就不全,当成为 Leader 后,它本身就变得膨胀了,认为自己的数据才是权威的。
需要注意的是,该参数在最新版的 Kafka 中默认为 false。但是由于社区对该参数的默认值进行了多次更改,所以建议在使用时显式地将其设置为 false。
该参数用于控制是否允许 Kafka 定期进行 Leader 选举。建议将该参数设置为 false。
设置auto.leader.rebalance.enable
为 true 表示允许 Kafka 定期对一些 Topic 分区进行 Leader 重选举。需要满足一定的条件才会触发 Leader 重选举。
与unclean.leader.election.enable
参数不同的是,auto.leader.rebalance.enable
并不是选举新的 Leader,而是更换现有的 Leader。例如,如果 Leader A 一直表现良好,但是当auto.leader.rebalance.enable
为 true 时,经过一段时间后,Leader A 可能会被强制卸任,换成 Leader B。
需要注意的是,Leader 的更换代价很高。原本向 Leader A 发送请求的所有客户端都需要切换成向 Leader B 发送请求。而且这种 Leader 的更换本质上没有任何性能收益。
因此,在生产环境中,建议将auto.leader.rebalance.enable
设置为 false,避免不必要的 Leader 更换。
在 Kafka 中,有一组参数用于控制数据的留存。下面我将逐个介绍这些参数。
log.retention.{hours|minutes|ms}
:这是一组参数,用于控制消息数据在 Kafka 中保存的时间。这三个参数分别是以小时(hours)、分钟(minutes)和毫秒(ms)为单位的时间间隔。优先级上,ms 设置最高,minutes 次之,hours 最低。通常情况下,我们会设置较长的时间间隔,比如 log.retention.hours=168 表示默认保存 7 天的数据,自动删除 7 天前的数据。如果将 Kafka 用作存储系统,那么这个值可能需要相应调大。
log.retention.bytes
:这个参数用于指定 Broker 在磁盘上保存的消息数据的总容量大小。默认值为-1,表示没有容量限制,即可以保存任意大小的数据。这个参数在构建云上的多租户 Kafka 集群时发挥作用。假设你要提供一个云上的 Kafka 服务,每个租户只能使用 100GB 的磁盘空间,为了避免某个租户占用过多的磁盘空间,设置这个参数就非常重要了。
message.max.bytes
:这个参数用于控制 Broker 能够接收的最大消息大小。默认值为 1000012,即不到 1MB。然而,在实际场景中,超过 1MB 的消息是很常见的。因此,在生产环境中,将这个值设置得比较大是比较保险的做法。这个参数只是一个标尺,仅仅衡量 Broker 能够处理的最大消息大小,即使设置得大一点也不会占用太多磁盘空间。
需要注意的是,这些参数都是可配置的,可以根据实际需求进行调整。在配置文件中,可以通过设置对应的属性来修改这些参数的值。例如,可以在server.properties
文件中添加以下配置来修改log.retention.hours
参数的值:
log.retention.hours=168
这样就将消息数据的保存时间设置为 7 天。
总结一下,这些参数在 Kafka 中起到了重要的作用,可以根据实际需求来调整,以满足不同的业务场景。
Topic 级别的参数在 Kafka 中非常重要,它允许我们为每个 Topic 设置特定的参数值,这些参数会覆盖全局 Broker 参数的值。这样做的好处是可以根据不同的业务需求,为不同的 Topic 设置不同的参数,提高系统的灵活性和效率。
下面我将详细介绍几个重要的 Topic 级别参数,按照用途分组。
retention.ms
:规定了该 Topic 消息被保存的时长。默认值是 7 天,即该 Topic 只保存最近 7 天的消息。如果设置了这个值,它会覆盖 Broker 端的全局参数值。通过设置不同的 retention.ms 值,我们可以根据业务需求来控制消息的保存时长,避免无效的数据占用过多的存储空间。 retention.bytes
:规定了为该 Topic 预留的磁盘空间大小。和全局参数的作用类似,这个值在多租户的 Kafka 集群中非常有用。默认值是-1,表示可以无限使用磁盘空间。通过设置不同的 retention.bytes 值,我们可以根据不同的 Topic 的数据量来合理分配磁盘空间,避免存储空间不足的问题。 max.message.bytes
:决定了 Kafka Broker 能够正常接收该 Topic 的最大消息大小。在很多公司中,Kafka 作为基础架构组件运行,承载了大量的业务数据。如果在全局层面上无法给出一个合适的最大消息值,那么允许不同的业务部门自行设定 Topic 级别的 max.message.bytes
参数就显得非常必要了。通过设置不同的 max.message.bytes
值,我们可以根据不同的业务需求来控制消息的大小,确保系统能够正常处理各种大小的消息。 通过设置 Topic 级别的参数,我们可以根据不同的业务需求来灵活地调整 Kafka 的配置,提高系统的性能和可用性。同时,这也是 Kafka 作为一个高性能分布式消息系统的重要特性之一。
需要注意的是,Topic 级别的参数只对该 Topic 中的消息生效,不会影响其他 Topic。如果没有为某个 Topic 设置特定的参数值,那么将会使用全局 Broker 参数的默认值。
除了上述介绍的参数,Kafka 还有其他一些 Topic 级别的参数,如cleanup.policy
、compression.type
等,它们都可以根据具体的业务需求进行设置。在实际应用中,我们可以根据不同的场景和需求,灵活地使用这些参数来优化 Kafka 集群的性能和可靠性。
总结一下,Topic 级别的参数允许我们为每个 Topic 设置特定的参数值,覆盖全局 Broker 参数的值。通过设置不同的参数值,我们可以根据业务需求来控制消息的保存时长、磁盘空间使用和消息大小等,提高系统的灵活性和效率。这是 Kafka 作为一个高性能分布式消息系统的重要特性之一。
在 Kafka 中,可以通过两种方式来设置 Topic 级别的参数:在创建 Topic 时设置和修改已存在的 Topic 时设置。
1. 创建 Topic 时设置参数
在创建 Topic 时,可以通过--config
参数来设置 Topic 级别的参数。例如,我们要创建一个名为transaction
的 Topic,并设置retention.ms
为 15552000000,max.message.bytes
为 5242880,可以使用以下命令:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880
在上述命令中,--config
后面的参数用于指定要设置的 Topic 级别参数。
2. 修改已存在的 Topic 时设置参数
可以使用kafka-configs
命令来修改已存在的 Topic 的参数。假设我们要将transaction
Topic 的max.message.bytes
修改为 10485760,可以使用以下命令:
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760
在上述命令中,--entity-type topics
表示要修改的实体类型为 Topic,--entity-name transaction
表示要修改的 Topic 名称,--alter
表示要进行修改操作,--add-config
后面的参数用于指定要修改的 Topic 级别参数及其新值。
个人建议
个人建议始终坚持使用第二种方式来设置 Topic 级别参数,并且在未来,Kafka 社区很有可能统一使用kafka-configs
脚本来调整 Topic 级别参数。这样做的好处是统一了设置参数的方式,减少了学习成本和混淆,同时也更加方便管理和维护。
JVM 参数对于 Kafka 集群的性能和稳定性非常重要。在设置 JVM 参数之前,首先需要确定 Java 版本。对于 Kafka 来说,不推荐在 Java 6 或 7 的环境上运行,建议至少使用 Java 8。
在 JVM 参数设置中,堆大小是一个关键参数。尽管后面我们还会讨论如何调优 Kafka 性能的问题,但是现在我想给出一个通用的建议:将 JVM 堆大小设置为 6GB,这是目前业界普遍认可的一个合理值。很多人使用默认的堆大小来运行 Kafka,但是默认的 1GB 有点小,因为 Kafka Broker 在与客户端进行交互时会在 JVM 堆上创建大量的 ByteBuffer 实例,堆大小不能太小。
另一个重要的 JVM 参数是垃圾回收器(GC)的设置。如果你仍在使用 Java 7,可以根据以下规则选择合适的垃圾回收器:如果 Broker 所在机器的 CPU 资源非常充裕,建议使用 CMS(Concurrent Mark Sweep)收集器,启用方法是指定-XX:+UseConcMarkSweepGC
。否则,使用吞吐量收集器,启用方法是指定-XX:+UseParallelGC
。如果你使用 Java 8,可以手动设置使用 G1(Garbage First)收集器。在没有任何调优的情况下,G1 表现要比 CMS 更出色,主要体现在更少的 Full GC 和需要调整的参数更少等方面,所以使用 G1 就可以了。
现在我们确定了要设置的 JVM 参数,接下来我们来为 Kafka 进行设置。奇怪的是,这个问题在 Kafka 官网上居然没有被提及。实际上,设置的方法非常简单,你只需要设置下面这两个环境变量即可:
KAFKA_HEAP_OPTS
:指定堆大小。例如,你可以这样设置: export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
KAFKA_JVM_PERFORMANCE_OPTS
:指定 GC 参数。例如,你可以这样设置: export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
在启动 Kafka Broker 之前,先设置好这两个环境变量,然后执行启动命令,例如:
$ export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
$ export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
$ bin/kafka-server-start.sh config/server.properties
这样就完成了 JVM 参数的设置。通过合理的设置,可以提高 Kafka 集群的性能和稳定性。需要注意的是,具体的参数设置可能因环境和需求而有所不同,可以根据实际情况进行调整。
Kafka 集群通常需要设置一些操作系统参数来优化性能和稳定性。下面是一些常见的操作系统参数设置:
需要注意的是,以上参数设置是一般情况下的建议,具体的设置还需要根据实际情况和硬件配置进行调整。另外,不同的操作系统和版本可能会有不同的参数设置方式,请参考相应的操作系统文档或官方建议进行设置。
下面是一个示例,展示如何在 Linux 系统上设置 ulimit -n 参数:
# 查看当前文件描述符限制
ulimit -n
# 修改文件描述符限制为 1000000
ulimit -n 1000000
# 验证修改是否生效
ulimit -n
请注意,以上示例仅适用于 Linux 系统,其他操作系统可能有不同的设置方式。
[1]
首发博客地址: https://blog.zysicyj.top/
[2]
文章更新计划: https://blog.zysicyj.top/update_plan/
[3]
系列文章地址: https://blog.zysicyj.top/categories/技术文章/后端技术/系列文章/Kafka/