作者:Sriharsha Chintalapani, Jay Kumar SenSharma 译者:java达人 来源:https://community.hortonworks.com/articles/80813/kafka-best-practices-1.html
我们推荐带有G1收集器的最新java 1.8(这是新版本的默认配置)。 如果您使用的是Java 1.7和G1收集器,请确保您使用的是u51或更高版本。
JVM的推荐设置如下所示:
-Xmx8g -Xms8g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
一旦确定JVM大小,将剩余的RAM留给操作系统进行页面缓存。 您需要足够的内存给页面缓存以供活跃的读写方缓冲使用。
通常,磁盘吞吐量是性能瓶颈,磁盘多一点更好。这取决于如何配置flush行为,如果log.flush.interval.messages设置为每100k左右消息就flush,则更快的磁盘将会有所帮助。
文件描述符限制:Kafka需要为文件和网络连接打开文件描述符。 我们建议至少允许128000文件描述符。
译者注:log.flush.interval.messages是log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"****数据可靠性****"与"性能"之间做必要的权衡
可以增加最大套接字缓冲区大小以实现高性能数据传输。 更多细节在这里。
http://www.psc.edu/index.php/networking/641-tcp-tune
我们建议使用多个驱动器来获得良好的吞吐量 不要与任何其他应用程序或kafka程序日志共享相同的驱动器。
可以使用server.properties中的log.dirs配置多个驱动器。 Kafka以轮询方式将partition分配给log.dirs目录。
注意:如果partition之间的数据不平衡,可能会导致磁盘间的负载不均衡。在分配数据到空间占用较少的磁盘方面,kafka目前表现并不好。 因此,用户端很容易在1个磁盘上耗尽磁盘空间,而其他驱动器仍具有可用磁盘空间,这将导致kafka挂掉。
我们强烈建议用户对kafka驱动器的磁盘使用情况创建警报,以避免Kafka服务中断。
RAID可能会在磁盘间实现更好的负载均衡。 但是由于写入速度较慢,RAID可能会导致性能瓶颈,并减少可用磁盘空间。 尽管RAID可以容忍磁盘故障,但重建RAID阵列是I/O密集型的,导致服务器禁用。 所以RAID并没有提供太多实际可用的改进。
Kafka总是立即将数据写入文件,并允许用户配置log.flush.interval.messages以强制到达配置数量的消息flush。为了让上述配置生效,需要将log.flush.scheduler.interval.ms设置为合理的值。
此外,只要日志文件达到log.segment.bytes或log.roll.hours的值,Kafka就会将日志文件flush到磁盘。
注意:kafka的持久性并不要求将数据同步到磁盘,因为挂掉的broker可以从replica中恢复 topic-partition。 但请注意replica.lag.time.max.ms,如果follower没有发出任何fetch请求或没有从leader的log-end offset中消费,那么leader默认在10秒内将follower从ISR删除。 由于这种特性,如果您没有明确设置log.flush.interval.messages,则有可能丢失消息,尽管发生的概率很小。 如果leader挂掉了,且follower没有同步leader数据,那么在这10秒内他们仍然可能处于ISR中,那么在这段时间的消息就会丢失。
java达人注:
replica.lag.time.max.ms:replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas移出ISR(in-sync replicas),并认为它是失效的,不会再加入管理中
Kafka在磁盘上使用普通文件,因此它对特定文件系统没有硬性依赖。
我们建议使用EXT4或XFS。 最近对XFS文件系统的改进表明,它对kafka的工作负载具有更好的性能,而且不会影响稳定性。
注意:请勿使用挂载的共享驱动器和任何网络文件系统。 根据我们的经验,kafka在这类文件系统上存在索引故障。 Kafka使用MemoryMapped文件来存储offset索引,该索引在网络文件系统上存在问题。
org.apache.kafka.producer.KafkaProduer,升级到新的producer。
使用flush()设置批量大小的经验规则:
batch.size =两次flush()之间的字节总数/partition数。
如果满足以下条件之一,则batch已准备就绪:
大批量意味着
为生产者定义持久性级别。
Acks | 吞吐量 | 延迟 | 持久性 |
---|---|---|---|
0 | 高 | 低 | 不保证 |
1 | 中 | 中 | Leader |
-1 | 低 | 高 | ISR |
java达人注:
Acks参数决定了producer要求leader partition 收到确认的副本个数,如果acks设置数量为0,表示producer不会等待broker的响应,所以,producer无法知道消息是否发送成功,这样有可能会导致数据丢失,但同时,acks值为0会得到最大的系统吞吐量。若acks设置为1,表示producer会在leader partition收到消息时得到broker的一个确认,这样会有更好的可靠性,因为客户端会等待直到broker确认收到消息。若设置为-1,producer会在所有备份的partition收到消息时得到broker的确认,这个设置可以得到最高的可靠性保证。
Max.in.flight.requests.per.connection> 1意味着pipelining流水线技术。
有关Kafka微基准测试的更多详细信息:https://drive.google.com/drive/u/1/folders/0ByKuMXNl6yEPfjVTRXIwaU45Qmh1Y3ktaExQa3YwZlR6SlZQTVVMckY2RGptb09QRS0zbVE