kafka的消息是存储在硬盘上的,因为“磁盘慢”这个普遍性的认知,常常使人们怀疑一个这样的持久化结构是否能提供所需的性能。但实际上磁盘因为使用的方式不同,它可能比人们预想的慢很多也可能比人们预想的快很多;而且一个合理设计的磁盘文件结构常常可以使磁盘运行得和网络一样快。
kafka的设计是将所有的数据被直接写入文件系统上一个可暂不执行磁盘 flush 操作的持久化日志文件中。实际上这意味着这些数据是被传送到了内核的页缓存上。
我们在上一节讨论了磁盘性能。 一旦消除了磁盘访问模式不佳的情况,该类系统性能低下的主要原因就剩下了两个:
大量的小型 I/O 操作(小包问题),以及过多的字节拷贝(ZeroCopy)
小型的 I/O 操作发生在客户端和服务端之间以及服务端自身的持久化操作中。
为了避免这种情况,我们的协议是建立在一个 “消息块” 的抽象基础上,合理将消息分组。 这使得网络请求将多个消息打包成一组,而不是每次发送一条消息,从而使整组消息分担网络中往返的开销。服务器一次性的将多个消息快依次追加到日志文件中, Consumer 也是每次获取多个大型有序的消息块。这个简单的优化对速度有着数量级的提升。批处理允许更大的网络数据包,更大的顺序读写磁盘操作,连续的内存块等等,所有这些都使 KafKa 能将随机性突发性的消息写操作变成顺序性的写操作最终流向消费者。
另一个低效率的操作是字节拷贝,在消息量少时,这不是什么问题。但是在高负载的情况下,影响就不容忽视。为了避免这种情况,我们让 producer ,broker 和 consumer 都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。
broker 维护的消息日志本身就是一个文件目录,每个文件都由一系列以相同格式写入到磁盘的消息集合组成,这种写入格式被 producer 和 consumer 共用。保持这种通用格式可以对一些很重要的操作进行优化:持久化日志块的网络传输。 现代的 unix 操作系统提供了高度优化的数据路径,用于将数据从 pagecache 转移到 socket 网络连接中;在 Linux 中系统调用sendfile 做到这一点。 为了理解 sendfile 的意义,首先要了解数据从文件到套接字的一般数据传输路径:
这显然是低效的,有四次 copy 操作和两次系统调用。使用 sendfile 方法,可以允许操作系统将数据从 pagecache 直接发送到网络,这样避免重复数据复制。所以这种优化方式,只需要最后一步的 copy 操作,将数据复制到 NIC 缓冲区。我们预期的使用场景是一个 topic 被多个消费者消费。使用 zero-copy (零拷贝)优化,数据仅仅会被复制到 pagecache 一次,在后续的消费过程中都可以复用,而不是保存在内存中在每次消费时再复制到内核空间。这使得消息能够以接近网络连接的速度被消费。 pagecache 和 sendfile 的组合使用意味着,在一个 Kafka 集群中,大多数的(紧跟生产者的)consumer 消费时,将看不到磁盘上的读取活动,因为数据完全由缓存提供。
Kafka 以高效的批处理格式支持一批消息可以压缩在一起发送到服务器。这批消息将以压缩格式写入,并且在日志中保持压缩,只会在 consumer 消费时解压缩。
Kafka 支持 GZIP,Snappy 和 LZ4 压缩协议。
生产者直接发送数据到主分区的服务器上,不需要经过任何中间路由。
客户端控制消息发送数据到哪个分区,这个可以实现随机的负载均衡方式,或者使用一些特定语义的分区函数。我们有提供特定分区的接口让用于根据指定的键值进行 hash 分区(当然也有选项可以重写分区函数),例如,如果使用用户 ID 作为 key,则用户相关的所有数据都会被分发到同一个分区上。这允许消费者在消费数据时做一些特定的本地化处理。这样的分区风格经常被设计用于一些对本地处理比较敏感的消费者。
批处理是提升性能的一个主要驱动,为了允许批量处理,kafka 生产者会尝试在内存中汇总数据,并用一次请求批次提交信息。 批处理,不仅仅可以配置指定的消息数量,也可以指定等待特定的延迟时间(如 64k 或 10ms),这允许汇总更多的数据后再发送,在服务器端也会减少更多的 IO 操作。 该缓冲是可配置的,并给出了一个机制,通过权衡少量额外的延迟时间获取更好的吞吐量。
Kafka consumer 通过向 broker 发出一个“fetch”请求来获取它想要消费的 partition。consumer 的每个请求都在 log 中指定了对应的 offset,并接收从该位置开始的一大块数据。并且可以在需要的时候通过回退到该位置再次消费对应的数据。
持续追踪已经被消费的内容是消息系统的关键性能点之一。
Kafka 使用完全不同的方式解决消息丢失问题。Kafka 的 topic 被分割成了一组完全有序的partition,其中每一个 partition 在任意给定的时间内只能被每个订阅了这个 topic 的consumer 组中的一个 consumer 消费。这意味着 partition 中 每一个consumer 的位置仅仅是一个数字,即下一条要消费的消息的 offset。这使得被消费的消息的状态信息相当少,每partition只需要一个数字。这个状态信息还可以作为周期性的 checkpoint。这以非常低的代价实现了和消息确认机制等同的效果。
这种方式还有一个附加的好处。consumer 可以回退到之前的 offset 来再次消费之前的数据,这个操作违反了队列的基本原则,但事实证明对大多数 consumer 来说这是一个必不可少的特性。 例如,如果 consumer 的代码有 bug,并且在 bug 被发现前已经有一部分数据被消费了,那么 consumer 可以在 bug 修复后通过回退到之前的 offset 来再次消费这些数据。
离线数据加载
可伸缩的持久化特性允许 consumer 只进行周期性的消费,例如批量数据加载,周期性将数据加载到诸如 Hadoop 和关系型数据库之类的离线系统中。
在 Hadoop 的应用场景中,我们通过将数据加载分配到多个独立的 map 任务来实现并行化,每一个 map 任务负责一个 node/topic/partition,从而达到充分并行化。Hadoop 提供了任务管理机制,失败的任务可以重新启动而不会有重复数据的风险,只需要简单的从原来的位置重启即可。
Kafka 可以提供的消息交付语义保证有:
At most once - 消息可能会丢失但绝不重传 At least once - 消息可以重传但绝不丢失 Exactly once - 这可能是用户真正想要的,每条消息只被传递一次
Kafka 允许 topic 的 partition 拥有若干副本,你可以在 server 端配置 partition 的副本数量。 当集群中的节点出现故障时,能自动进行故障转移,保证数据的可用性。
创建副本的单位是 topic 的 partition ,正常情况下,每个分区都有一个 leader 和零或多个followers 。总的副本数是包含 leader 的总和。所有的读写操作都由 leader 处理,一般partition 的数量都比 broker 的数量多的多,各分区的 leader 均匀的分布在 brokers 中。所有的 followers 节点都同步 leader 节点的日志,日志中的消息和偏移量都和 leader 中的一致。(当然,在任何给定时间,leader 节点的日志末尾时可能有几个消息尚未被备份完成)。
Followers 节点就像普通的 consumer 那样从 leader 节点那里拉取消息并保存在自己的日志文件中。Followers 节点可以从 leader 节点那里批量拉取消息日志到自己的日志文件中。
与大多数分布式系统一样,自动处理故障需要精确定义节点 “alive” 的概念。Kafka 判断节点是否存活有两种方式:
我们认为满足这两个条件的节点处于 “in sync” 状态,区别于 “alive” 和 “failed” 。Leader 会追踪所有 “in sync” 的节点。如果有节点挂掉了,或是写超时,或是心跳超时,leader 就会把它从同步副本列表中移除。 同步超时和写超时的时间由 replica.lag.time.max.ms 配置确定。
分布式系统中,我们只尝试处理 “fail/recover” 模式的故障,即节点突然停止工作,然后又恢复(节点可能不知道自己曾经挂掉)的状况。Kafka 没有处理所谓的 “Byzantine” 故障,即一个节点出现了随意响应和恶意响应(可能由于 bug 或 非法操作导致)。
当所有的分区上 in sync repicas 都应用到 log 上时,消息可以认为是 “committed”,只有committed 消息才会给 consumer。这意味着 consumer 不需要担心潜在因为 leader 失败而丢失消息。而对于 producer 来说,可以依据 latency 和 durability 来权衡选择是否等待消息被committed ,这个行动由 producer 使用的 acks 设置来决定。
在所有时间里,Kafka 保证只要有至少一个同步中的节点存活,提交的消息就不会丢失。
节点挂掉后,经过短暂的故障转移后,Kafka 将仍然保持可用性,但在网络分区( networkpartitions )的情况下可能不能保持可用性。
向 Kafka 写数据时,producers 设置 ack 是否提交完成,
0:不等待 broker 返回确认消息,
1: leader 保存成功返回或,
-1(all): 所有备份都保存成功返回。
请注意。设置 “ack = all” 并不能保证所有的副本都写入了消息。默认情况下,当 acks = all 时,只要 ISR 副本同步完成,就会返回消息已经写入。例如,一个 topic 仅仅设置了两个副本,那么只有一个 ISR 副本,那么当设置 acks = all 时返回写入成功时,剩下了的那个副本数据也可能数据没有写入。尽管这确保了分区的最大可用性,但是对于偏好数据持久性而不是可用性的一些用户,可能不想用这种策略,因此,我们提供了两个 topic 配置,可用于优先配置消息数据持久性:
上面关于 replicated logs 的讨论仅仅局限于单一 log ,比如一个 topic 分区。但是 Kafka 集群需要管理成百上千个这样的分区。我们尝试轮流的方式来在集群中平衡分区来避免在小节点上处理大容量的 topic。
同样关于 leadership 选举的过程也同样的重要,这段时间可能是无法服务的间隔。一个原始的 leader 选举实现是当一个节点失败时会在所有的分区节点中选主。相反,我们选用 broker之一作为 “controller”, 这个 controller 检测 broker 失败,并且为所有受到影响的分区改变leader。这个结果是我们能够将许多需要变更 leadership 的通知整合到一起,让选举过程变得更加容易和快速。如果 controller 失败了,存活的 broker 之一会变成新的 controller。
日志压缩可确保 Kafka 始终至少为单个 topic partition 的数据日志中的每个 message key 保留最新的已知值。这样的设计解决了应用程序崩溃、系统故障后恢复或者应用在运行维护过程中重启后重新加载缓存的场景。接下来让我们深入讨论这些在使用过程中的更多细节,阐述在这个过程中它是如何进行日志压缩的。
迄今为止,我们只介绍了简单的日志保留方法(当旧的数据保留时间超过指定时间、日志文件大小达到设置大小后就丢弃)。这样的策略非常适用于处理那些暂存的数据,例如记录每条消息之间相互独立的日志。然而在实际使用过程中还有一种非常重要的场景 – 根据 key 进行数据变更(例如更改数据库表内容),使用以上的方式显然不行。
让我们来讨论一个关于处理这样流式数据的具体的例子。假设我们有一个 topic,里面的内容包含用户的 email 地址;每次用户更新他们的 email 地址时,我们发送一条消息到这个topic,这里使用用户 Id 作为消息的 key 值。现在,我们在一段时间内为 id 为 123 的用户发送一些消息,每个消息对应 email 地址的改变。
日志压缩为我们提供了更精细的保留机制,所以我们至少保留每个 key 的最后一次更新(例如:bill@gmail.com)。这样我们保证日志包含每一个 key 的最终值而不只是最近变更的完整快照。这意味着下游的消费者可以获得最终的状态而无需拿到所有的变化的消息信息。
日志压缩机制是更细粒度的、每个记录都保留的机制,而不是基于时间的粗粒度。这个理念是选择性删除那些有更新的变更的记录的日志。这样最终日志至少包含每个 key 的记录的最后一个状态。
这种保留策略可以针对每一个 topci 进行设置,遮掩一个集群中,可以让部分 topic 通过时间和大小保留日志,另一些可以通过压缩策略保留。
日志压缩基础
这是一个高级别的日志逻辑图,展示了 kafka 日志的每条消息的 offset 逻辑结构。
Log head 中包含传统的 Kafka 日志,它包含了连续的 offset 和所有的消息。日志压缩增加了处理 tail Log 的选项。上图展示了日志压缩的的 Log tail 的情况。tail 中的消息保存了初次写入时的 offset。 即使该 offset 的消息被压缩,所有 offset 仍然在日志中是有效的。在这个场景中,无法区分和下一个出现的更高 offset 的位置。如上面的例子中,36、37、38 是属于相同位置的,从他们开始读取日志都将从 38 开始。
压缩也允许删除。通过消息的 key 和空负载(null payload)来标识该消息可从日志中删除。这个删除标记将会引起所有之前拥有相同 key 的消息被移除(包括拥有 key 相同的新消息)。但是删除标记比较特殊,它将在一定周期后被从日志中删除来释放空间。这个时间点被称为“delete retention point”,如上图。
压缩操作通过后台周期性的拷贝日志段来完成。清除操作不会阻塞读取,并且可以被配置不超过一定 IO 吞吐来避免影响 Producer 和 Consumer。实际的日志段压缩过程有点像这样:
日志压缩的保障措施:
Log压缩的细节:
日志压缩由 log cleaner 执行,log cleaner 是一个后台线程池,它会 recopy 日志段文件,移除那些 key 存在于 Log Head 中的记录。每个压缩线程工作的步骤如下:
配置Log Cleaner
log.cleaner.enable=true
这会启动清理线程池。如果要开启特定 topic 的清理功能,需要开启特定的 log-specific 属性
log.cleanup.policy=compact
这个可以通过创建 topic 时配置或者之后使用 topic 命令实现。
producers 和 consumer 可能会产生和消费大量的消息从而导致独占 broker 资源,进而引起网络饱和,对其他 client 和 broker 造成 DOS 攻击。资源的配额保护可以有效的防止这些问题,大型的多租户集群中,因为一小部分表现不佳的客户端降低了良好的用户体验,这种情况下非常需要资源的配额保护。实际情况中,当把 Kafka 当做一种服务提供时,可以根据客户端和服务端的契约对 API 调用做限制。
默认情况下,每个唯一的客户端分组在集群上配置一个固定的限额,这个限额是基于每台服务器的 (quota.producer.default, quota.consumer.default),每个客户端能发布或获取每台服务器都的最大速率,我们按服务器 (broker) 定义配置,而不是按整个集群定义,是因为如果是集群范围需要额外的机制来共享配额的使用情况,这会导致配额机制的实现比较难。
覆盖 client-ids 默认的配额是可行的。这个机制类似于每一个 topic 日志的配置覆盖。client-id覆盖会被写到 ZooKeeper,这个覆盖会被所有的 broker 读取并且迅速加载生效。这样使得我们可以不需要重启集群中的机器而快速的改变配额。