Kafka 是基于 发布与订阅 的 消息系统 。它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。Kafka
是一个分布式的,可分区的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。
在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能、低延迟的不停流转。传统的企业消息系统并不是非常适合大规模的数据处理。为了同时搞定在线应用(消息)和离线应用(数据文件、日志),Kafka
就出现了。Kafka 可以起到两个作用:
Kafka 的主要特点?
这段是从网络上找来的。感觉想要表达的意思是
* 消息是否被处理完成,是通过 Consumer 提交消费进度给 Broker ,而不是 Broker 消息被 Consumer拉取后,就标记为已消费。 * 当 Consumer 异常崩溃时,可以重新分配消息分区到其它的 Consumer 们,然后继续消费。
聊聊 Kafka 的设计要点?
1)吞吐量
高吞吐是 Kafka 需要实现的核心目标之一,为此 kafka 做了以下一些设计:
直接使用 Linux 文件系统的 Cache ,来高效缓存数据。
采用 Linux Zero-Copy 提高发送性能。
* 传统的数据发送需要发送 4 次上下文切换。 * 采用 sendfile 系统调用之后,数据直接在内核态交换,系统上下文切换减少为 2次。《为什么Kafka这么快》https://www.jianshu.com/p/99cc19dde7df
数据在磁盘上存取代价为
O(1)
。* Kafka 以 Topic 来进行消息管理,每个 Topic 包含多个 Partition ,每个 Partition 对应一个逻辑 log,有多个 segment 文件组成。 每个 segment 中存储多条消息(见下图),消息 id 由其逻辑位置决定,即从消息 id 可直接定位到消息的存储位置,避免 id 到位置的额外映射。 每个 Partition 在内存中对应一个 index ,记录每个 segment 中的第一条消息偏移。
发布者发到某个 Topic 的消息会被均匀的分布到多个 Partition 上(随机或根据用户指定的回调函数进行分布),Broker 收到发布消息往对应 Partition 的最后一个 segment 上添加该消息。undefined 当某个 segment上 的消息条数达到配置值或消息发布时间超过阈值时,segment上 的消息会被 flush 到磁盘,只有 flush 到磁盘上的消息订阅者才能订阅到,segment 达到一定的大小后将不会再往该 segment 写数据,Broker 会创建新的 segment 文件。
2)负载均衡
3)拉取系统
由于 Kafka Broker 会持久化数据,Broker 没有内存压力,因此, Consumer 非常适合采取 pull
的方式消费数据,具有以下几点好处:
4)可扩展性
通过 Zookeeper 管理 Broker 与 Consumer 的动态加入与离开。
[Kafka
架构图](https://links.jianshu.com/go?to=http%3A%2F%2Fstatic.iocoder.cn%2Fac883ce247c1ff31c7cd4244392dcaed)
Kafka 的整体架构非常简单,是分布式架构,Producer、Broker 和Consumer 都可以有多个。
几个重要的基本概念:
* replicas:Partition 的副本集,保障 Partition 的高可用。 * leader:replicas 中的一个角色,Producer 和 Consumer 只跟 Leader 交互。 * follower:replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的followers 中选举出一个新的 leader 继续提供服务。
Consumer group:每个 Consumer 都属于一个 Consumer group,每条消息只能被 Consumer group 中的一个 Consumer 消费,但可以被多个 Consumer group 消费。
Controller:Kafka 集群中,通过 Zookeeper 选举某个 Broker 作为 Controller ,用来进行 leader election 以及 各种 failover 。
单纯角色来说,Kafka 和 RocketMQ 是基本一致的。比较明显的差异是:
RocketMQ 从 Kafka 演化而来。
RocketMQ 没有首领分区一说,所以打上了引号。
RocketMQ 的 push 的方式,也是基于 poll 的方式的封装。
Kafka 为什么要将 Topic 进行分区?
为了负载均衡,从而能够水平拓展。
所以,Partiton 机制可以极大的提高吞吐量,并且使得系统具备良好的水平扩展能力。
[Kafka
的应用场景](https://links.jianshu.com/go?to=http%3A%2F%2Fstatic.iocoder.cn%2F3636ff4bd554ee1dfcfb92448073b5b8)
1)消息队列
比起大多数的消息系统来说,Kafka 有更好的吞吐量,内置的分区,冗余及容错性,这让 Kafka
成为了一个很好的大规模消息处理应用的解决方案。消息系统一般吞吐量相对较低,但是需要更小的端到端延时,并常常依赖于 Kafka
提供的强大的持久性保障。在这个领域,Kafka 足以媲美传统消息系统,如 ActiveMQ 或 RabbitMQ 。
2)行为跟踪
Kafka 的另一个应用场景,是跟踪用户浏览页面、搜索及其他行为,以发布订阅的模式实时记录到对应的 Topic
里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到 Hadoop / 离线数据仓库里处理。
3)元信息监控
作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。
4)日志收集
日志收集方面,其实开源产品有很多,包括 Scribe、Apache Flume 。很多人使用 Kafka 代替日志聚合(log
aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或 HDFS)进行处理。
然而, Kafka 忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让 Kafka
处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如 Scribe 或者 Flume 来说,Kafka
提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。
5)流处理
这个场景可能比较多,也很好理解。保存收集流数据,以提供之后对接的 Storm 或其他流式计算框架进行处理。很多用户会将那些从原始 Topic
来的数据进行阶段性处理,汇总,扩充或者以其他的方式转换到新的 Topic 下再继续后面的处理。
例如一个文章推荐的处理流程,可能是先从 RSS 数据源中抓取文章的内容,然后将其丢入一个叫做“文章”的 Topic
中。后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返还给用户。这就在一个独立的 Topic
之外,产生了一系列的实时数据处理的流程。Strom 和 Samza 是非常著名的实现这种类型数据转换的框架。
6)事件源
事件源,是一种应用程序设计的方式。该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka
可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。
7)持久性日志(Commit Log)
Kafka 可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka
中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka 类似于 Apache BookKeeper 项目。
image.png
1)Producer 发送消息
Producer 采用 push 模式将消息发布到 Broker,每条消息都被 append 到 Patition
中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 Kafka 吞吐率)。Producer 发送消息到 Broker
时,会根据分区算法选择将其存储到哪一个 Partition 。
其路由机制为:
写入流程:
"/brokers/.../state"
节点找到该 Partition 的 leader 。注意噢,Producer 只和 Partition 的 leader 进行交互。2)Broker 存储消息
物理上把 Topic 分成一个或多个 Patition,每个 Patition 物理上对应一个文件夹(该文件夹存储该 Patition
的所有消息和索引文件)。
3)Consumer 消费消息
high-level Consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 Consumer
所消费,且 Consumer 消费消息时不关注 offset ,最后一个 offset 由 ZooKeeper 保存(下次消费时,该 group 中的
Consumer 将从 offset 记录的位置开始消费)。
注意:
Consumer 采用 pull 模式从 Broker 中读取数据。
Kafka Producer 有哪些发送模式?
Kafka 的发送模式由 Producer 端的配置参数 producer.type
来设置。
producer.type=sync
。producer.type=async
,可以是 Producer 以 batch 的形式 push 数据,这样会极大的提高 Broker的性能,但是这样会增加丢失数据的风险。producer.type
设置为 sync 。对于异步模式,还有 4 个配套的参数,如下:
image.png
batch.num.messages
)控制。通过增加 batch 的大小,可以减少网络请求和磁盘 IO 的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。batch.size
这个参数。Producer 会尝试批量发送属于同一个 Partition 的消息以减少请求的数量. 这样可以提升客户端和服务端的性能。默认大小是 16348 byte (16k). Kafka Consumer 是否可以消费指定的分区消息?
Consumer 消费消息时,向 Broker 发出“fetch”请求去消费特定分区的消息,Consumer
指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,Consumer 拥有了 offset
的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的。
Kafka 的 high-level API 和 low-level API 的区别?
High Level API
Low Level API
Low-level API 也就是 Simple Consumer API ,实际上非常复杂。
Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的。这里先引用Kafka源码中注释的一段话:
An NIO socket server. The threading model isundefined 1 Acceptor thread that handles new connections.undefined Acceptor has N Processor threads that each have their own selector and read requests from sockets.undefined M Handler threads that handle requests and produce responses back to the processor threads for writing.
Kafka的网络通信层模型,主要采用了 1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程)
。下面的表格简要的列举了下
线程数 | 线程名 | 线程具体说明 |
---|---|---|
1 | kafka-socket-acceptor_%x | Acceptor线程,负责监听Client端发起的请求 |
N | kafka-network-thread_%d | Processor线程,负责对Socket进行读写 |
M | kafka-request-handler-_%d | Worker线程,处理具体的业务逻辑并生成Response返回 |
Kafka网络通信层的完整框架图如下图所示:
image
Kafka的网络通信层框架结构有几个重要概念:
(1) Acceptor :1个接收线程,负责监听新的连接请求,同时注册OP_ACCEPT 事件,将新的连接按照 "round robin"
方式交给对应的 Processor 线程处理;
(2) Processor :N个处理器线程,其中每个 Processor 都有自己的 selector,它会向 Acceptor 分配的
SocketChannel 注册相应的 OP_READ 事件,N 的大小由 “num.networker.threads” 决定;
(3) KafkaRequestHandler
:M个请求处理线程,包含在线程池—KafkaRequestHandlerPool内部,从RequestChannel的全局请求队列—requestQueue中获取请求数据并交给KafkaApis处理,M的大小由
“num.io.threads” 决定;
(4) RequestChannel :其为Kafka服务端的请求通道,该数据结构中包含了一个全局的请求队列
requestQueue和多个与Processor处理器相对应的响应队列responseQueue,提供给Processor与请求处理线程KafkaRequestHandler和KafkaApis交换数据的地方。
(5) NetworkClient :其底层是对 Java NIO
进行相应的封装,位于Kafka的网络接口层。Kafka消息生产者对象—KafkaProducer的send方法主要调用NetworkClient完成消息发送;
(6) SocketServer
:其是一个NIO的服务,它同时启动一个Acceptor接收线程和多个Processor处理器线程。提供了一种典型的Reactor多线程模式,将接收客户端请求和处理请求相分离;
(7) KafkaServer :代表了一个Kafka Broker的实例;其startup方法为实例启动的入口;
(8) KafkaApis :Kafka的业务逻辑处理Api,负责处理不同类型的请求;比如 “发送消息” 、
“获取消息偏移量—offset” 和 “处理心跳请求” 等;
结合Kafka网络通信层的源码来分析其设计与实现,这里主要详细介绍网络通信层的几个重要元素—SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。本文分析的源码部分均基于Kafka的0.11.0版本。
SocketServer是接收客户端Socket请求连接、处理请求并返回处理结果的核心类,Acceptor及Processor的初始化、处理逻辑都是在这里实现的。在KafkaServer实例启动时会调用其startup的初始化方法,会初始化1个
Acceptor和N个Processor线程(每个EndPoint都会初始化,一般来说一个Server只会设置一个端口),其实现如下:
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
var processorBeginIndex = 0
// 一个broker一般只设置一个端口
config.listeners.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val processorEndIndex = processorBeginIndex + numProcessorThreads
//N 个 processor
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
//1个 Acceptor
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
acceptor.awaitStartup()
processorBeginIndex = processorEndIndex
}
}
Acceptor是一个继承自抽象类AbstractServerThread的线程类。Acceptor的主要任务是监听并且接收客户端的请求,同时建立数据传输通道—SocketChannel,然后以轮询的方式交给一个后端的Processor线程处理(具体的方式是添加socketChannel至并发队列并唤醒Processor线程处理)。
在该线程类中主要可以关注以下两个重要的变量:
(1),nioSelector:通过NSelector.open()方法创建的变量,封装了JAVA NIO Selector的相关操作;
(2),serverChannel:用于监听端口的服务端Socket套接字对象;
下面来看下Acceptor主要的run方法的源码:
def run() {
//首先注册OP_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
//以轮询方式查询并等待关注的事件发生
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
//如果事件发生则调用accept方法对OP_ACCEPT事件处理
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
//轮询算法
// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
//代码省略
}
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
processor.accept(socketChannel)
} catch {
//省略部分代码
}
}
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}
在上面源码中可以看到,Acceptor线程启动后,首先会向用于监听端口的服务端套接字对象—ServerSocketChannel上注册OP_ACCEPT
事件。然后以轮询的方式等待所关注的事件发生。如果该事件发生,则调用accept()方法对OP_ACCEPT事件进行处理。这里,Processor是通过
round robin 方法选择的,这样可以保证后面多个Processor线程的负载基本均匀。
Acceptor的accept()方法的作用主要如下:
(1)通过SelectionKey取得与之对应的serverSocketChannel实例,并调用它的accept()方法与客户端建立连接;
(2)调用connectionQuotas.inc()方法增加连接统计计数;并同时设置第(1)步中创建返回的socketChannel属性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等)
(3)将socketChannel交给processor.accept()方法进行处理。这里主要是将socketChannel加入Processor处理器的并发队列newConnections队列中,然后唤醒Processor线程从队列中获取socketChannel并处理。其中,newConnections会被Acceptor线程和Processor线程并发访问操作,所以newConnections是ConcurrentLinkedQueue队列(一个基于链接节点的无界线程安全队列)
Processor同Acceptor一样,也是一个线程类,继承了抽象类AbstractServerThread。其主要是从客户端的请求中读取数据和将KafkaRequestHandler处理完响应结果返回给客户端。在该线程类中主要关注以下几个重要的变量:
(1) newConnections :在上面的 Acceptor
一节中已经提到过,它是一种ConcurrentLinkedQueueSocketChannel类型的队列,用于保存新连接交由Processor处理的socketChannel;
(2) inflightResponses :是一个Map[String,
RequestChannel.Response]类型的集合,用于记录尚未发送的响应;
(3) selector :是一个类型为KSelector变量,用于管理网络连接;
下面先给出Processor处理器线程run方法执行的流程图:
image
从上面的流程图中能够可以看出Processor处理器线程在其主流程中主要完成了这样子几步操作:
(1) 处理newConnections队列中的socketChannel
。遍历取出队列中的每个socketChannel并将其在selector上注册OP_READ事件;
(2) 处理RequestChannel中与当前Processor对应响应队列中的Response
。在这一步中会根据responseAction的类型(NoOpAction/SendAction/CloseConnectionAction)进行判断,若为“NoOpAction”,表示该连接对应的请求无需响应;若为“SendAction”,表示该Response需要发送给客户端,则会通过“selector.send”注册OP_WRITE事件,并且将该Response从responseQueue响应队列中移至inflightResponses集合中;“CloseConnectionAction”,表示该连接是要关闭的;
(3) 调用selector.poll()方法进行处理 。该方法底层即为调用nioSelector.select()方法进行处理。
(4) 处理已接受完成的数据包队列—completedReceives
。在processCompletedReceives方法中调用“requestChannel.sendRequest”方法将请求Request添加至requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处理。同时,调用“selector.mute”方法取消与该请求对应的连接通道上的OP_READ事件;
(5) 处理已发送完的队列—completedSends
。当已经完成将response发送给客户端,则将其从inflightResponses移除,同时通过调用“selector.unmute”方法为对应的连接通道重新注册OP_READ事件;
(6) 处理断开连接的队列
。将该response从inflightResponses集合中移除,同时将connectionQuotas统计计数减1;
在Kafka的网络通信层中,RequestChannel为Processor处理器线程与KafkaRequestHandler线程之间的数据交换提供了一个数据缓冲区,是通信过程中Request和Response缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。Processor线程将读取到的请求添加至RequestChannel的全局请求队列—requestQueue中;KafkaRequestHandler线程从请求队列中获取并处理,处理完以后将Response添加至RequestChannel的响应队列—responseQueue中,并通过responseListeners唤醒对应的Processor线程,最后Processor线程从响应队列中取出后发送至客户端。
KafkaRequestHandler也是一种线程类,在KafkaServer实例启动时候会实例化一个线程池—KafkaRequestHandlerPool对象(包含了若干个KafkaRequestHandler线程),这些线程以守护线程的方式在后台运行。在KafkaRequestHandler的run方法中会循环地从RequestChannel中阻塞式读取request,读取后再交由KafkaApis来具体处理。
KafkaApis是用于处理对通信网络传输过来的业务消息请求的中心转发组件。该组件反映出Kafka Broker Server可以提供哪些服务。
Kafka 每个 Topic 下面的所有消息都是以 Partition 的方式分布式的存储在多个节点上。同时在 Kafka 的机器上,每个
Partition 其实都会对应一个日志目录,在目录下面会对应多个日志分段(LogSegment)。
下面先介绍一下partition中的segment file的组成:
关于segment file中index与data file对应关系图,这里我们选用网上的一个图片,如下所示:
segment的索引文件中存储着大量的元数据,数据文件中存储着大量消息,索引文件中的元数据指向对应数据文件中的message的物理偏移地址。以索引文件中的3,497
为例,在数据文件中表示第3个message(在全局partition表示第368772个message),以及该消息的物理偏移地址为497。
注:Partition中的每条message由offset来表示它在这个partition中的偏移量,这个offset并不是该Message在partition中实际存储位置,而是逻辑上的一个值(如上面的3),但它却唯一确定了partition中的一条Message(可以认为offset是partition中Message的id)。
message中的物理结构为:
参数说明:
关键字 | 解释说明 |
---|
8 byte offset |
在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message size | message大小
4 byte CRC32 | 用crc32校验message
1 byte “magic” | 表示本次发布Kafka服务程序协议版本号
1 byte “attributes” | 表示为独立版本、或标识压缩类型、或编码类型
4 byte key length | 表示key的长度,当key为-1时,K byte key字段不填
K byte key | 可选
value bytes payload | 表示实际消息数据
假如我们想要读取offset=368776的message,需要通过下面2个步骤查找。
1). 查找segment file
00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770
= 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 +
1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。
当offset=368776时定位到00000000000000368769.index|log
2). 通过segment file查找message
通过第一步定位到segment
file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。
segment index
file并没有为数据文件中的每条message建立索引,而是采取稀疏索引存储方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小,通过map可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
如果就以 Partition 为最小存储单位,可以想象,当 Kafka Producer 不断发送消息,必然会引起 Partition
文件的无限扩张,将对消息文件的维护以及已消费的消息的清理带来严重的影响,因此,需以 segment 为单位将 Partition 进一步细分。
每个 Partition(目录)相当于一个巨型文件,被平均分配到多个大小相等的 segment(段)数据文件中(每个 segment
文件中消息数量不一定相等),这种特性也方便 old segment 的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个 Partition
只需要支持顺序读写就行,segment 的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}
等若干参数)决定。
Kafka 的副本机制,是多个 Broker 节点对其他节点的 Topic
分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫 Reblance),Kafka
每个主题的每个分区都有一个主副本以及 0 个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。
副本机制
注意哈,下面说的 Leader 指的是每个 Topic 的某个分区的 Leader ,而不是 Broker 集群中的【集群控制器】。
在 Kafka 中并不是所有的副本都能被拿来替代主副本,所以在 Kafka 的Leader 节点中维护着一个 ISR(In sync
Replicas)集合,翻译过来也叫正在同步中集合,在这个集合中的需要满足两个条件:
另外还有个 AR(Assigned Replicas)用来标识副本的全集,OSR 用来表示由于落后被剔除的副本集合,所以公式如下:
这里先要说下两个名词:HW 和 LEO 。
当 Producer 向 Leader 发送数据时,可以通过request.required.acks
参数来设置数据可靠性的级别:
acks=1
的情况。在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:
request.required.acks
配置,是写入自己完成就响应给 Producer 成功,还是写入所有 Broker 完成再响应。这个,就是胖友自己对消息的可靠性的选择。注意噢,此处说的都是同一个 Kafka Consumer group 。
总的来说,Kafka 和 RocketMQ 的高可用方式是比较类似的,主要的差异在 Kafka Broker 的副本机制,和 RocketMQ Broker
的主从复制,两者的差异,以及差异带来的生产和消费不同。当然,实际上,都是和“主” Broker 做消息的发送和读取不是!
唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边 自动提交了 offset ,让 Kafka
以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。
这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要 关闭自动提交
offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是 可能会有重复消费 ,比如你刚处理完,还没提交
offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。
生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存
queue,然后消费者会自动提交 offset。然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。
[](https://links.jianshu.com/go?to=https%3A%2F%2Fgithub.com%2Fdoocs%2Fadvanced-
java%2Fblob%2Fmain%2Fdocs%2Fhigh-concurrency%2Fhow-to-ensure-the-reliable-
transmission-of-
messages.md%23kafka-%25E5%25BC%2584%25E4%25B8%25A2%25E4%25BA%2586%25E6%2595%25B0%25E6%258D%25AE)Kafka
弄丢了数据
这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的
follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader
之后,不就少了一些数据?这就丢了一些数据啊。
生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。
所以此时一般是要求起码设置如下 4 个参数:
replication.factor
参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。min.insync.replicas
参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系,没掉队,这样才能确保 leader 挂了还有一个 follower 吧。acks=all
:这个是要求每条数据,必须是 写入所有 replica 之后,才能认为是写成功了 。retries=MAX
(很大很大很大的一个值,无限次重试的意思):这个是 要求一旦写入失败,就无限重试 ,卡在这里了。我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行
leader 切换时,数据不会丢失。
[](https://links.jianshu.com/go?to=https%3A%2F%2Fgithub.com%2Fdoocs%2Fadvanced-
java%2Fblob%2Fmain%2Fdocs%2Fhigh-concurrency%2Fhow-to-ensure-the-reliable-
transmission-of-
messages.md%23%25E7%2594%259F%25E4%25BA%25A7%25E8%2580%2585%25E4%25BC%259A%25E4%25B8%258D%25E4%25BC%259A%25E5%25BC%2584%25E4%25B8%25A2%25E6%2595%25B0%25E6%258D%25AE)生产者会不会弄丢数据?
如果按照上述的思路设置了 acks=all
,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower
都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
Kafka 本身,并不像 RocketMQ 一样,提供顺序性的消息。所以,提供的方案,都是相对有损的。如下:
这里的顺序消息,我们更多指的是,单个 Partition 的消息,被顺序消费。
方式一,Consumer ,对每个 Partition 内部单线程消费,单线程吞吐量太低,一般不会用这个。
方式二,Consumer ,拉取到消息后,写到 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue 。然后,对于 N
个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
这种方式,相当于对【方式一】的改进,将相同 Partition 的消息进一步拆分,保证相同 key 的数据消费是顺序的。
不过这种方式,消费进度的更新会比较麻烦。
当然,实际情况也不太需要考虑消息的顺序性,基本没有业务需要。
为了保证可靠性,对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader 复制过去才会被认为已提交,并返回信息给
producer。如此,可以避免因部分数据被写进 leader,而尚未被任何 follower 复制就宕机的情况下而造成数据丢失。对于 producer
而言,它可以选择是否等待消息 commit,这可以通过参数 request.required.acks 来设置。这种机制可以确保:只要 ISR
中有一个或者以上的 follower,一条被 commit 的消息就不会丢失。
有一个很重要的问题是当 leader 宕机了,怎样在 follower 中选举出新的 leader,因为 follower 可能落后很多或者直接 crash
了,所以必须确保选择 “最新” 的 follower 作为新的 leader。一个基本的原则就是,如果 leader 挂掉,新的 leader
必须拥有原来的 leader 已经 commit 的所有消息,这不就是 ISR 中副本的特征吗?
但是,存在一个问题,ISR 列表维持多大的规模合适呢?换言之,leader 在一个消息被 commit 前需要等待多少个 follower 确认呢?等待
follower 的数量越多,与 leader 保持同步的 follower 就越多,可靠性就越高,但这也会造成吞吐率的下降。
一种常用的选举 leader 的策略是 “少数服从多数” ,不过,Kafka 并不是采用这种方式。这种模式下,如果有 2f+1 个副本,那么在 commit
之前必须保证有 f+1 个 replica 复制完消息,同时为了保证能正确选举出新的 leader,失败的副本数不能超过 f
个。这种方式有个很大的优势,系统的延迟取决于最快的几台机器,也就是说比如副本数为 3,那么延迟就取决于最快的那个 follower 而不是最慢的那个。
“少数服从多数” 的策略也有一些劣势,为了保证 leader 选举的正常进行,它所能容忍的失败的 follower 数比较少,如果要容忍 1 个
follower 挂掉,那么至少要 3 个以上的副本,如果要容忍 2 个 follower 挂掉,必须要有 5
个以上的副本。也就是说,在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能的急剧下降。这种算法更多用在
ZooKeeper 这种共享集群配置的系统中,而很少在需要大量数据的系统中使用。
实际上,leader 选举的算法非常多,比如 ZooKeeper 的 Zab、Raft 以及 Viewstamped Replication。而 Kafka
所使用的 leader 选举算法更像是微软的 PacificA 算法。
Kafka 在 ZooKeeper 中为每一个 partition 动态的维护了一个 ISR,这个 ISR 里的所有 replica 都与 leader
保持同步,只有 ISR 里的成员才能有被选为 leader
的可能(通过参数配置:unclean.leader.election.enable=false)。在这种模式下,对于 f+1 个副本,一个 Kafka
topic 能在保证不丢失已经 commit 消息的前提下容忍 f
个副本的失败,在大多数使用场景下,这种模式是十分有利的。事实上,对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader
复制过去才会被认为已提交,并返回信息给 producer,从而保证可靠性。但与 “少数服从多数” 策略不同的是,Kafka ISR
列表中副本的数量不需要超过副本总数的一半,即不需要满足 “多数派” 原则,通常,ISR 列表副本数大于等于 2 即可,如此,便在可靠性和吞吐量方面取得平衡。
前已述及,当 ISR 中至少有一个 follower 时(ISR 包括 leader),Kafka 可以确保已经 commit 的消息不丢失,但如果某一个
partition 的所有 replica 都挂了,自然就无法保证数据不丢失了。这种情况下如何进行 leader 选举呢?通常有两种方案:
如何选择呢?这就需要在可用性和一致性当中作出抉择。如果一定要等待 ISR 中的 replica 恢复过来,不可用的时间就可能会相对较长。而且如果 ISR
中所有的 replica 都无法恢复了,或者数据丢失了,这个 partition 将永远不可用。
选择第一个恢复过来的 replica 作为 leader,如果这个 replica 不是 ISR 中的 replica,那么,它可能并不具备所有已经
commit 的消息,从而造成消息丢失。默认情况下,Kafka 采用第二种策略,即
unclean.leader.election.enable=true,也可以将此参数设置为 false 来启用第一种策略。
unclean.leader.election.enable 这个参数对于 leader
的选举、系统的可用性以及数据的可靠性都有至关重要的影响。生产环境中应慎重权衡。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。