Apache Kafka的安装与配置(一)

Apache Kafka的安装与配置

导言:

作为一个高吞吐量的开源分布式发布订阅信息系统,Apache Kafka的应用场景非常广阔。特别在Apache Hadoop、HyperLedger Fabric等大型项目中,Apache Kafka都是以关键组件的面目出现在大家面前。

为方便大家对Apache Kafka的学习和使用,笔者对Apache Kafka进行了概念、配置等相关介绍,并以SUSE12为平台搭建了相关环境,供大家学习和参考。

01

基本概念和定义

1. Kafka简介

Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。

Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。

Kafka支持低延迟消息传递,并在出现机器故障时提供对容错的保证。 它具有处理大量不同消费者的能力。Kafka非常快,处理速率可达2百万写/秒。

Kafka将所有数据保存到磁盘,这实质上意味着所有写入都会进入操作系统(RAM)的页面缓存;这使得将数据从页面缓存传输到网络套接字非常有效。

2. 概念

(1)Topics(主题)

属于特定类别的消息流称为主题。数据存储在主题中。一般一个Topic会被多个Consumer订阅。

(2)Partition(分区)

每个Topic可能有许多分区,每个分区是一个有序的、记录,不断追加一个结构化的提交日志不变的序列。每个分区中的记录都分配了一个连续的ID号,称为唯一地标识分区中每个记录的偏移量。可以这样认为,Partition是物理的概念,每个Partition相当于一个文件夹;而Topic是逻辑的概念,Producer和Consumer只要关心各自推送和订阅的Topic,无需关心整条小心存于集群的那个Broker。

(3)Partition offset(分区偏移)

每个分区消息具有称为 offset 的唯一序列标识。

(4)Replicas of Partition(分区备份)

副本只是一个分区的备份。 副本从不读取或写入数据。 它们用于防止数据丢失。

(5)Segment(分区)

Segment对应于2个文件(1个索引文件,1个数据文件)。一个Partition对应于一个文件夹。逻辑上一个Partition可以包含无穷多个Segment。数据清理时,旧的Segment将直接被删除。

(6)Brokers(缓存代理)

缓存代理是负责维护发布数据的简单系统。实际使用中,每个Kafka的服务实例就是一个Broker。

a) 假设在一个主题和N个代理中有N个分区,每个代理将有一个分区。

b) 假设在一个主题中有N个分区并且有N+ M个代理(M,N>0),则第一个N代理将具有一个分区,并且下一个M代理将不具有用于该特定主题的任何分区。

c) 假设在一个主题中有N个分区并且有N-M个代理(M,N>0),每个代理将在它们之间具有一个或多个分区共享。 由于代理之间的负载分布不相等,不推荐使用此方案。

(7)Kafka Cluster(Kafka集群)

Kafka网络如果存在多个代理,则该Kafka网络被称为Kafka集群。 可以扩展Kafka集群,无需停机。 这些集群用于管理消息数据的持久性和复制。

(8)Producers(生产者)

生产者是发送给一个或多个Kafka主题的消息的发布者。 生产者向Kafka经纪人发送数据。 每当生产者将消息发布给代理时,代理只需将消息附加到最后一个段文件。实际上,该消息将被附加到分区。 生产者还可以向他们选择的分区发送消息。

(9)Consumers(消费者)

Consumers从经纪人处读取数据。 消费者订阅一个或多个主题,并通过从代理中提取数据来使用已发布的消息。

(10)Leader(领导者)

Leader 是负责给定分区的所有读取和写入的节点。 每个分区都有一个服务器充当Leader。

(11)Follower(追随者)

跟随领导者指令的节点被称为Follower。 如果领导失败,一个追随者将自动成为新的领导者。 跟随者作为正常消费者,拉取消息并更新其自己的数据存储。

3. ZooKeeper简介

Apache Kafka的一个关键依赖是Apache Zookeeper。 Zookeeper是一个分布式配置和同步服务。

在Kafka网络中,是Kafka代理和消费者之间的协调接口。 Kafka服务器通过Zookeeper集群共享信息。 Kafka在Zookeeper中存储基本元数据,例如关于主题,代理,消费者偏移(队列读取器)等的信息。

由于所有关键信息存储在Zookeeper中,并且它通常在其整体上复制此数据,因此Kafka代理/ Zookeeper的故障不会影响Kafka集群的状态。 Kafka将恢复状态,一旦Zookeeper重新启动。 这为Kafka带来了零停机时间。 Kafka代理之间的领导者选举也通过使用Zookeeper在领导者失败的情况下完成。

02

重要配置文件相关说明

1. Zookeeper配置(zoo.cfg)

Zookeeper是一个集群服务,集群的每个节点都需要zoo.cfg。为了避免出差错,zoo.cfg里没有跟特定节点相关的配置,所以每个节点上的这个zoo.cfg都是一模一样的。

配置要素:

(1)clientPort: 客户端连接zookeeper服务的TCP监听端口,默认2181。

(2)dataDir:存放内存数据结构的snapshot,为节点快速恢复留存。

(3)dataLogDir:存放顺序日志(WAL)。dataLogDir如果没提供的话使用的则是dataDir。

一般建议把dataDir和dataLogDir分到不同的磁盘上,这样就可以充分利用磁盘顺序写的特性。

(4)server.myid: 配置IP,Leader和Follower/Observer交换数据用端口,Zookeeper选举用端口。

典型示例:

server.1=127.0.0.1:20881:30881

server.2=127.0.0.1:20882:30882

server.3=127.0.0.1:20883:30883

在上面的例子中,我把三个zookeeper服务放到同一台机器上。server.后面的数字。这个就是myid。

myid是集群服务的唯一标示,一定要保证myid在整个集群中唯一。设定后ZooKeeper会在dataDir里会放置一个myid文件,用来唯一标识这个服务(里面就一个数字)。zookeeper会根据这个id来取出server.x上的配置。比如当前id为1,则对应着zoo.cfg里的server.1的配置。

(5)tickTime:时间单位定量。

例如,tickTime=1000表示在zookeeper里1 tick等于1000 ms,所有其他用到时间的地方都会用多少tick来表示。

(6)syncLimit:就表示follower与leader的心跳时间。

例如 syncLimit=2表示心跳时间=2 tick。

(7)maxClientCnxns:对于一个客户端的连接数限制,默认是60。实际使用中可能需要调整。

(8)minSessionTimeout:最小Session 超时时间。

(9)maxSessionTimeout:最大Session 超时时间。

一般,客户端连接zookeeper时,会设置一个session timeout,如果超过这个时间Client没有与ZooKeeper Server有联系,则这个session会被设置为过期(如果这个session上有临时节点,则会被全部删除,这就是实现集群感知的基础)。但是这个时间不是客户端可以无限制设置的,服务器可以设置这两个参数来限制客户端设置的范围。

(10)autopurge.purgeInterval:内存数据清理间隔(小时)。

(11)autopurge.snapRetainCount:清理snapshot时最大文件保留数量。

由于Client在与ZooKeeper Server交互过程中会产生非常多的日志,而且ZooKeeper Server也会将内存中的数据作为snapshot保存下来,这些数据是不会被自动删除的,这样磁盘中这样的数据就会越来越多。为保证磁盘空间不无限增长,可以通过以上两个参数来设置,让zookeeper自动删除数据。

注意:删除操作可能会影响zookeeper集群的性能,所以一般会让这个过程在访问低谷的时候进行,但是遗憾的是zookeeper并没有设置在哪个时间点运行的设置,所以有的时候我们会禁用这个自动删除的功能。

2. Kafka配置(XXX.properties)

(1)broker.id:

每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响Consumers的消息情况

(2)log.dirs:

kafka数据的存放地址,多个地址的话用逗号分割 /data/kafka-logs-1,/data/kafka-logs-2

(3)port:

broker server服务端口(默认9092)

(4)message.max.bytes:

表示消息体的最大大小,单位是字节(默认6525000)

(5)num.network.threads :

broker处理消息的最大线程数,一般情况下不需要去修改(默认4)

(6)num.io.threads:

broker处理磁盘IO的线程数,数值应该大于你的硬盘数(默认8)

(7)background.threads:

一后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改(默认4)

(8)queued.max.requests:

等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。(默认500)

(9)host.name:

broker的主机地址,若设置则绑定到该地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置

(10)socket.send.buffer.bytes:

socket的发送缓冲区,socket的调优参数SO_SNDBUFF(默认100*1024)

(11)socket.receive.buffer.bytes :

Socket的接受缓冲区,socket的调优参数SO_RCVBUFF(默认100*1024)

(12)socket.request.max.bytes :

Socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖(默认100*1024*1024)

(13)log.segment.bytes:

Topic的分区是以一堆Segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖(默认100*1024*1024)

(14)log.roll.hours :

这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖(默认24*7)

(15)log.cleanup.policy:

日志清理策略选择,选项为delete/compact。主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖(默认delete)

(16)log.retention.minutes:

数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据(默认3days)

(17)log.retention.bytes:

topic每个分区的最大文件大小,一个topic的大小限制: 分区数*log.retention.bytes。

log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖(默认-1没有大小限制)

(18)log.retention.check.interval.ms:

文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略(默认5minutes)

(19)log.cleaner.enable:是否开启日志压缩(默认false)

(20)log.cleaner.threads:日志压缩运行的线程数(默认2)

(21)log.cleaner.io.max.bytes.per.second:日志压缩时候处理的最大大小(默认None)

(22)log.cleaner.dedupe.buffer.size:日志压缩去重时候的缓存空间,在空间允许的情况下,越大越好。(默认500*1024*1024)

(23)log.cleaner.io.buffer.size:日志清理时候用到的IO块大小一般不需要修改(默认512*1024)

(24)log.cleaner.io.buffer.load.factor:日志清理中hash表的扩大因子,一般不需要修改。(默认0.9)

(25)log.cleaner.backoff.ms:检查是否处罚日志清理的间隔(默认15000)

(26)log.cleaner.min.cleanable.ratio:

对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖(默认1day)

(28)log.index.size.max.bytes:

对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖(默认10*1024*1024)

(29)log.index.interval.bytes:

当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数(默认4096)

(30)log.flush.interval.messages:

log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.(默认None)

(31)log.flush.scheduler.interval.ms:

检查是否需要固化到硬盘的时间间隔(默认3000)

(32)log.flush.interval.ms:

仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.(默认None)

(33)log.delete.delay.ms:

文件在索引中清除后保留的时间一般不需要去修改(默认60000)

(34)log.flush.offset.checkpoint.interval.ms:

控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改(默认60000)

(35)auto.create.topics.enable:

是否允许自动创建topic,若是false,就需要通过命令创建topic(默认true)

(36)default.replication.factor:

是否允许自动创建topic,若是false,就需要通过命令创建topic(默认1)

(37)num.partitions:

每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖(默认1)

(38)controller.socket.timeout.ms:

partition leader与replicas之间通讯时,socket的超时时间(默认30000)

(39)controller.message.queue.size:

partition leader与replicas数据同步时,消息的队列尺寸(默认10)

(40)replica.lag.time.max.ms:

replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中(默认10000)

(41)replica.lag.max.messages:

如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效.

通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后;如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移到其他follower中。在broker数量较少,或者网络不足的环境中,建议提高此值。(默认4000)

(42)replica.socket.timeout.ms:

follower与leader之间的socket超时时间(默认30*1000)

(43)replica.socket.receive.buffer.bytes:

leader复制时候的socket缓存大小(默认64*1024)

(44)replica.fetch.max.bytes:

replicas每次获取数据的最大大小(默认1024*1024)

(45)replica.fetch.wait.max.ms:

replicas同leader之间通信的最大等待时间,失败了会重试(默认500)

(46)replica.fetch.min.bytes:

fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件(默认1)

(47)num.replica.fetchers:

leader进行复制的线程数,增大这个数值会增加follower的IO(默认1)

(48)replica.high.watermark.checkpoint.interval.ms:

每个replica检查是否将最高水位进行固化的频率(默认5000)

(49)controlled.shutdown.enable:

是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker(默认false)

(50)controlled.shutdown.max.retries:

控制器关闭的尝试次数(默认3)

(51)controlled.shutdown.retry.backoff.ms:

每次关闭尝试的时间间隔(默认5000)

(52)leader.imbalance.per.broker.percentage:

leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡(默认10)

(53)leader.imbalance.check.interval.seconds:

检查leader是否不平衡的时间间隔(默认300)

(54)offset.metadata.max.bytes:

客户端保留offset信息的最大空间大小

(55)zookeeper.connect:

zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3(默认localhost:2181)

(56)zookeeper.session.timeout.ms=6000

ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大(默认)

(57)zookeeper.connection.timeout.ms:

ZooKeeper的连接超时时间(默认6000)

(58)zookeeper.sync.time.ms:

ZooKeeper集群中leader和follower之间的同步时间(默认2000)

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180704G146XM00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码关注腾讯云开发者

领取腾讯云代金券