Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。它最初由LinkedIn(领英)公司发布,使用Scala语言编写,与2010年12月份开源,成为Apache的顶级子项目 。
Kafka是一个多分区、多副本且基于zookeeper协调的分布式消息系统。也是一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用 。
首先得了解这个,比较简单的一个集群图
如上图,只画了其中一个,具体看看里面是什么
broker
,也就可以这样理解,集群中每一台kafka服务就是broker
在没有画出来了另外两个kafka中,我们会推选出领导
Leader
,以及追随者Follower
,这个我们后面再聊
如上图,对于消费者如何消费分片中的消息的,其中有下面几点的解释
Partition
只能由一个Consumer
来消费,一个Consumer
可以消费多个不同的Partition
。所以,我们应该保证每个主题的Partition
的数量大于Consumer
的数量
Consumer
越多,则吞吐量越高,消费得越快。当然,要结合第一点
Consumer
增加或减少时,Partition
和Consumer
的消费关系会自动调整
在上一节看到了简单的消费,那只不过是同一个group
下,接下来引入group
这个概念
上面第三节说的都不错,在这里就是要加一个前提条件,一个Partition
的消息同一个group
中的一个Consumer
来消费,
交给了同组的某个Consumer
,就不能交给同组的其他Consumer
了
每一个group
都可以完整消费主题中的所有消息
partition
里面的消息如上图,有以下几点特性
offset
越大。不同partition
的消息根据offset
无法比较新l旧
Consumer
顺序地消费partition
里的每一条消息,可以每读一条就向kafka上报(commit),当前读到了哪个位置(offset),也可以间隔性上报
offset
,比如说每读5条,上报更新一下offset
offset
,比如说每隔5s
上报更新Consumer
重启时kafka
根据该group
上一次提交的最大offset
来决定从哪个地方开始消费。
group
之间,记录的offset
是不同的,这也是上一节每个group
独立消费topic
的消息的原因
Partition
关于生产者生产消息至Partition
,有三种情况,按照优先级这样排序
Partition
进行写入
key
,再通过hash
分发器计算得到结果,来决定去哪个Partition
Partition
。比如说当前5
分钟,往Partition 0
中写入;下一个5
分钟,往Partition 1
中写入
Partition
应答ack
在上面一节,我们确定了partition
存储是哪个,接下来还有一个问题,就是如果是kafka
集群架构的话,我们会出现同个Partition
,有一个Leader
,多个Follower
。
partition
后,我们要去寻找它的Leader
Leader partition
将消息写入本地磁盘
Producer
进行应答响应Follower partition
会将消息从Leader
那拉回来,写入自己的本地磁盘
Leader
进行应答响应leader
收到所有的Follower
应答后,再向Producer
应答那么在此刻,生产消息的应答ack
有三种策略
ack
应答
Producer
只需要Leader Partition
应答即可,不用管Follower Partition
是否写入成功
partition
需要保证所有的Follower
才进行应答
Partition
备份机制在kafka
集群中,我们有Partition
的备份机制,如下
同一个主题下,集群中的每个broker
,都会维护自己的Partition
。
Leader
、Follower
,生产者的数据优先推送给Leader
Partition
都有自己的Leader
Topic
下的,不同Partition
尽量分布在不同的broker
当有leader
的broker
宕机后,kafka
集群会重新竞选那台broker
上原本是leader
的Partition
,和下面ISR
队列有关。
Partition
,一个Topic
中有多个Partition
,可以有效地避免了消息的堆积
segment
,消息在Partition
里面,消息是分段来进行存储的,每次操作的消息读写都是针对segmengt
一个segment
包括一个log
文件,两个index
文件,三个文件成套出现。前面数字的文件名代表着offset
偏移量开始索引位置
Consumer
的offset
便宜量的索引文件index
,kafka
分段后的数据建立的索引文件
如下图
可以看到上面有两个索引文件,
index
文件是记录offset
消息和log
文件中消息位置的映射关系的文件
timeindex
文件时记录时间戳和offset
关系的文件
请注意,这边的索引并不会记录每一条消息的索引,而是采取稀疏索引,也就是隔一段消息才会记录消息的索引。
这个消息索引的稠密程度,影响
kafka
存储读取的速度 索引越稠密,则读取的速度越快 索引越稀疏,则文件存储的空间越大
由于上面存储文件都是采用offset
偏移量来命名,所以kafka
会采取二分查找方法,可以大大提交检索效率。
从上面架构上来看,kafka
丢失消息的原因主要可以分为下面几个场景
Producer
在把消息发送给kafka
集群时,中间网络出现问题,导致消息无法到达
Producer
消息超出大小限制,broker
收到以后没法进行存储kafka
集群接收到消息后,保存消息至本地磁盘出现异常
leader
副本限多的follower
副本成为新的leader
副本,那么落后的消息数据就会丢失。Consumer
在消费消息时发生异常,导致Consumer
端消费失败
offset
自动提交参数,enable.auto.commit=true
。消费者接受到了消息,进行了自动提交。但其实消费者并没有处理完成,就宕机了,此时kafka
认为Consumer
已经消费了这条消息了,后续便不再分配,造成了消息的丢失Producer
消息发送消息失败关于上面Producer
消息发送消息失败的解决方法,总结归纳出五种,可以结合使用
producer.send(msg, callback)
acks=all
。partition
的leader
接收到消息,等待所有的follower
副本都同步到了消息之后,才认为本次生产者发送消息成功了。
retrie>=3
,增加重试次数以保证消息的不丢失
broker
写入磁盘失败Partition
的多副本机制(建议)。使用下面的这段配置,
unclean.leader.election.enable=false
:表示不允许非ISR
中的副本被选举为leader
,以免数据丢失replication.factor>=3
:消息分区的副本个数,建议设置大于等于3
个min.insync.replicas>1
:这个值大于1
,要求leader
至少能和一个Follower
副本保证联系Consumer
消费异常消费者需要关闭自动提交,采用手动提交offset
,enable.auto.commit=false
,并在代码中写入
// 同步提交
consumer.commitSync();
// 异步提交
consumer.commitAsync();
这实际上是一个消息的幂等性问题
幂等性是指一个操作可以被重复执行,但结果不会改变的特性。在消息队列中,幂等性是指在消息消费过程中,保证消息的唯一性,不会出现重复消费的情况 。
我们有以下几个方案可以解决
insert ... on duplicate key update
,消费插入数据时,数据已存在则进行更新
如下图
取消掉两次CPU
的拷贝,从而减小CPU
的消耗。
零拷贝是操作系统提供的,如Linux
上的sendfile
命令,是将读到内核空间的数据,转到 socket buffer
,进行网络发送
还有Java NIO
中的transferTo()
方法
在kafka
的broker
中,主题下可以设置多个不同的partition
,而kafka
只能保证Partition
中的消息时有序的,但没法保证不同Partition
的消息顺序性。
比如说,有一个主题Topic A
,里面有两个Partition
,但消费端只有一个Consumer
。根据上面的架构可以知道,这个Consumer
会消费两个Partition
中的消息,这样就肯定会出现消费乱序的情况。
那么针对上面这种乱序的情况,我们可以这样进行设置
Partition
,这样所有的消息也就只会发送到一个Partition
中,也就保证了消息的顺序性。
Producer
也可以指定往一个partition
中发送消息。具体可以查看第二章第6节Partition
只能被一个Consumer
消费,也可以保证消息的有序性消费。但也要避免Rebalance
,原本一对一好好的,Consumer
宕机或者下线导致Rebalance
就会导致消费的乱序。
主要原因有下面几个
CPU
拷贝,减少了CPU
的消耗
Partition
,有效避免了消息的堆积segment
,消息在Partition
里面,消息是分段来进行存储的,每次操作的消息读写都是针对segmengt
index
,kafka
分段后的数据建立的索引文件,就是第二章第9节的文件存储结构kafka
是直接操作的page cache
,而不是堆内对象,读写速度更高。且进程重启后,缓存也不会丢失ISR
,它有什么用在kafka
中,除了有ISR
,还有OSR
,AR
,功能如下
kafka
中,当一个broker
宕机挂掉的时候,原本在其broker
的Leader Partition
会重新进行竞选。这个竞选基本从ISR
队列中选举。那么现在可以这样说,ISR
是一个维护了Follower Partition
的队列,其中的Partition
都与Leader Partition
消息保持一致。
ISR
队列中的其他Follower Partition
组成的队列
Follower Partition
,也就是ISR
和OSR
的Partition
总和
Rebalance
是指Partition
与Consumer
之间的关系需要重新调整分配,这个重新调整分配的动作称为Rebalance
。
那么当出现下面几种情况的时候,会触发Rebalance
Group
中的Consumer
新增后
Group
中的Consumer
离开后,比如说宕机
Topic
下的Partition
数量发生变化后
总之,两边的关系数量发生变化的话,都会触发Rebalance
当出现上面这种情况的时候,要么就是Consumer
挂掉了或者消费水平太低,要么就是Producer
消息太多,间接导致Consumer
消费不及时。
针对上面这种情况,我们可以有以下的解决方案,可以结合使用
Consumer
的数量,可以通过增加消费者组中的Consumer
数量或者增加Consumer
实例来实现。这样每个Consumer
可以并行处理消息,提高整体消费能力。
Partition
分区数量,在kafka
中,可以设置主题下的Partition
,将消息分散至更多的Partition
中,配合第一点方案提高整体的消费能力
Consumer
的消费能力,优化消费者的处理能力,确保Consumer
能够快速处理每条消息。将Consumer
处理消息的速度优化至高于Producer
生产消息的速度。在不破坏代码业务逻辑的情况下,也可以使用异步处理来消费消息。
在面试过程中,第三点方案是至关重要的,很多企业由于硬件资源的原因,没有增加Consumer
的数量,没有增加Partition
数量的空间。故此,Consumer
优秀的消费能力,就成了他们考察的目标了。
我是半月,你我一同共勉!!!