前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka最佳实践

Kafka最佳实践

作者头像
java达人
发布2018-07-31 16:59:37
1.4K0
发布2018-07-31 16:59:37
举报
文章被收录于专栏:java达人java达人

作者:Sriharsha Chintalapani, Jay Kumar SenSharma 译者:java达人 来源:https://community.hortonworks.com/articles/80813/kafka-best-practices-1.html

Kafka Broker:

Java版本

我们推荐带有G1收集器的最新java 1.8(这是新版本的默认配置)。 如果您使用的是Java 1.7和G1收集器,请确保您使用的是u51或更高版本。

JVM的推荐设置如下所示:

代码语言:javascript
复制
-Xmx8g -Xms8g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

OS设置

一旦确定JVM大小,将剩余的RAM留给操作系统进行页面缓存。 您需要足够的内存给页面缓存以供活跃的读写方缓冲使用。

通常,磁盘吞吐量是性能瓶颈,磁盘多一点更好。这取决于如何配置flush行为,如果log.flush.interval.messages设置为每100k左右消息就flush,则更快的磁盘将会有所帮助。

文件描述符限制:Kafka需要为文件和网络连接打开文件描述符。 我们建议至少允许128000文件描述符。

译者注:log.flush.interval.messages是log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"****数据可靠性****"与"性能"之间做必要的权衡

可以增加最大套接字缓冲区大小以实现高性能数据传输。 更多细节在这里。

http://www.psc.edu/index.php/networking/641-tcp-tune

磁盘和文件系统

我们建议使用多个驱动器来获得良好的吞吐量 不要与任何其他应用程序或kafka程序日志共享相同的驱动器。

可以使用server.properties中的log.dirs配置多个驱动器。 Kafka以轮询方式将partition分配给log.dirs目录。

注意:如果partition之间的数据不平衡,可能会导致磁盘间的负载不均衡。在分配数据到空间占用较少的磁盘方面,kafka目前表现并不好。 因此,用户端很容易在1个磁盘上耗尽磁盘空间,而其他驱动器仍具有可用磁盘空间,这将导致kafka挂掉。

我们强烈建议用户对kafka驱动器的磁盘使用情况创建警报,以避免Kafka服务中断。

RAID可能会在磁盘间实现更好的负载均衡。 但是由于写入速度较慢,RAID可能会导致性能瓶颈,并减少可用磁盘空间。 尽管RAID可以容忍磁盘故障,但重建RAID阵列是I/O密集型的,导致服务器禁用。 所以RAID并没有提供太多实际可用的改进。

Log Flush管理

Kafka总是立即将数据写入文件,并允许用户配置log.flush.interval.messages以强制到达配置数量的消息flush。为了让上述配置生效,需要将log.flush.scheduler.interval.ms设置为合理的值。

此外,只要日志文件达到log.segment.bytes或log.roll.hours的值,Kafka就会将日志文件flush到磁盘。

注意:kafka的持久性并不要求将数据同步到磁盘,因为挂掉的broker可以从replica中恢复 topic-partition。 但请注意replica.lag.time.max.ms,如果follower没有发出任何fetch请求或没有从leader的log-end offset中消费,那么leader默认在10秒内将follower从ISR删除。 由于这种特性,如果您没有明确设置log.flush.interval.messages,则有可能丢失消息,尽管发生的概率很小。 如果leader挂掉了,且follower没有同步leader数据,那么在这10秒内他们仍然可能处于ISR中,那么在这段时间的消息就会丢失。

java达人注:

replica.lag.time.max.ms:replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas移出ISR(in-sync replicas),并认为它是失效的,不会再加入管理中

文件系统选择

Kafka在磁盘上使用普通文件,因此它对特定文件系统没有硬性依赖。

我们建议使用EXT4或XFS。 最近对XFS文件系统的改进表明,它对kafka的工作负载具有更好的性能,而且不会影响稳定性。

注意:请勿使用挂载的共享驱动器和任何网络文件系统。 根据我们的经验,kafka在这类文件系统上存在索引故障。 Kafka使用MemoryMapped文件来存储offset索引,该索引在网络文件系统上存在问题。

Zookeeper

  • 不要将zookeeper放在与Kafka相同的容器上
  • 我们建议将zookeeper独立开来并仅用于Kafka,其他系统不要对这个zookeeper集群产生依赖
  • 确保你分配了足够的JVM,4Gb是一个不错的开始。
  • 监视器:使用JMX metrics来监视zookeeper实例

选择Topic/Partitions

  1. Topic/Partition是Kafka中并行的单位
  2. Kafka中的Partition驱动了消费者的并行性
  3. Partition数量越多,可以添加更多的并行消费者,从而产生更高的吞吐量。
  4. 根据吞吐量要求,可以有许多的Partition: 1.让生成器调用吞吐量到单个分区是P. 2.从单个分区到消费者的吞吐量是C. 3.目标吞吐量为T. 4.所需分区=最大(T / P,T / C)
  5. 更多的分区可能会增加延迟: 1.Kafka中端到端延迟由从producer发布消息到consumer读取消息的时间来定义。 2.Kafka仅在消息提交后向消费者公开消息,即消息被复制到所有同步replica时。 3.从一个broker复制1000partition到另一个broker的可能需要20ms。 对于某些实时应用程序而言,这时间可能太长。 4.在新的kafka producer中,消息将在producer端积累。 它允许用户设置用于缓冲传入消 息内存量的上限。 在内部,生产者缓冲每个partition的消息。 当足够的数据积累或设定的时间过后,积累的消息将被删除并发送给broker。 5.partition越多,则生产者端的积累的消息将会越多。 6.同样在consumer端,它从每个分区fetch一批消息。 消费者订阅的partition越多,它所需的内存就越多。

影响性能的因素

  • 主存,更具体的,是文件系统buffer cache
  • 多个专用磁盘
  • 每个topic的partition。 更多的patititon将增加并行性
  • 以太网带宽

Kafka Broker 配置

  1. 通过export KAFKA_HEAP_OPTS设置kafka broker 的jvm
  2. Log.retention.hours。此设置控制何时删除topic中的旧消息。 考虑您的磁盘空间以及您希望消息多长时间保持可用。 活跃的消费者可以快速读取并将消息传递到目标处。
  3. Message.max.bytes。 服务器可以接收的消息的最大大小。 确保将replica.fetch.max.bytes设置为等于或大于message.max.bytes
  4. Delete.topic.enable - 这将允许用户从Kafka中删除topic。 默认设置为false。 删除topic功能仅适用于Kafka 0.9以上版本。
  5. unclean.leader.election - 默认情况下,此配置设置为true。 打开这个设置,表示用户相对于持久性,更偏好可用性。 如果一个topic-part1的leader挂了,那么replica将被重新选举为新的leader,尽管它不具备ISR资格。 这意味着数据有可能丢失。 如果持久性更重要,我们建议您将其设置为false。

Kafka Producer:

org.apache.kafka.producer.KafkaProduer,升级到新的producer。

关键配置:

  • Batch.size(批处理大小)
  • Linger.ms(批处理时间)
  • Compression.type
  • Max.in.flight.requests.per.connection(影响排序)
  • Acks(影响持久性)

性能说明

  1. 进入同一partition的生产者线程比分散到多个partition的生产者线程更快
  2. 新的Producer API提供了一个flush() 方法,客户可以选择调用。 使用的时候,两次flush()调用之间的key字节数是获得良好性能的关键因素。 微基准测试表明,大约在4MB的时候有很好的性能(我们使用了1KB大小的事件)。

使用flush()设置批量大小的经验规则:

batch.size =两次flush()之间的字节总数/partition数。

  1. 如果想让生产者吞吐量最大化并且容器上有空余的CPU和网络,则添加更多生产者进程。
  2. 性能对事件大小敏感。 在我们的测量中,1KB流式事件传输比100byte事件快。 较大的事件可能会提供更好的吞吐量。
  3. 对于linger.ms,没有简单的经验法则。 需要针对特定用例进行尝试。 对于小事件(100字节或更少),它似乎对微基准测试没有太大影响。

从producer到broker请求的生命周期

  • 从batch队列中批量调度,每个partition1个batch
  • 基于leader broker进行batch分组
  • 将组合的batch发送给broker
  • max.in.flight.requests.per.connection大于1,则进行pipelinging流水线处理

如果满足以下条件之一,则batch已准备就绪:

  • 达到Batch.size
  • 达到Linger.ms
  • 同一个broker上的另一个batch,已经准备就绪
  • flush() 或close()被调用

大批量意味着

  • 更好的压缩比率,更高的吞吐量
  • 更高的延迟

Compression.type

  • 压缩是生产者工作的主要组成部分
  • 不同压缩类型的速度差别很大
  • 压缩在用户线程中,因此如果压缩很慢,添加更多线程有助于提高吞吐量

ACKs

为生产者定义持久性级别。

Acks

吞吐量

延迟

持久性

0

不保证

1

Leader

-1

ISR

java达人注:

Acks参数决定了producer要求leader partition 收到确认的副本个数,如果acks设置数量为0,表示producer不会等待broker的响应,所以,producer无法知道消息是否发送成功,这样有可能会导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待直到broker确认收到消息。若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。

Max.in.flight.requests.per.connection

Max.in.flight.requests.per.connection> 1意味着pipelining流水线技术。

  • 提供更好的吞吐量
  • 重试发生时可能导致无序传递
  • pipelining过多,会降低吞吐量

Kafka Consumer:

性能说明:

  • 在消费者方面,通常不需要调整就可以获得良好的性能。
  • 保持消费者良好表现的简单经验法则: 消费者线程数=partition数
  • 微基准测试显示与生产者相比,消费者性能对于事件大小或批次大小不敏感。 1kb和100byte事件都表现出相似的吞吐量。

有关Kafka微基准测试的更多详细信息:https://drive.google.com/drive/u/1/folders/0ByKuMXNl6yEPfjVTRXIwaU45Qmh1Y3ktaExQa3YwZlR6SlZQTVVMckY2RGptb09QRS0zbVE

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java达人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka Broker:
    • Java版本
      • OS设置
        • 磁盘和文件系统
          • Log Flush管理
            • 文件系统选择
              • Zookeeper
                • 选择Topic/Partitions
                  • 影响性能的因素
                    • Kafka Broker 配置
                    • Kafka Producer:
                      • 关键配置:
                        • 性能说明
                          • 从producer到broker请求的生命周期
                            • Compression.type
                              • ACKs
                                • Max.in.flight.requests.per.connection
                                • Kafka Consumer:
                                  • 性能说明:
                                  相关产品与服务
                                  容器服务
                                  腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                                  领券
                                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档