kafka基本结构
主题
kafka将消息抽象归纳一个主题,一个主题就是对消息的一个分类,生产发送消息到特定主题,消费者订阅主题进行消费
消息
消息是kafka通信的基本单位,由一个固定长度的消息头和一个可变长的消息体构成
分区和副本
kafka经一组消息归纳为一个主题,每个主题有被分为多个分区,每个分区在物理上对应为一个文件夹,分区编号从0开始,每个分区又有一到多个副本,分区的副本分布在集群的不同代理,以提高可用性,
kafka只能保证一个分区之内消息的有序性,并不能保证跨分区消息有序性,每个消息是顺序写到磁盘中的,因此效率很高
leader副本和follower副本
由于副本的存在,就需要保证一个分区的多个副本之间数据的一致性,kafka会选择该分区的一个副本作为Leader副本,而该分区其他副本为follower副本,只有leader副本处理客户端的读写请求,follower副本从leader副本同步数据.
偏移量
任何发布到分区的消息会直接追加到日志文件的尾部,每条消息在日志文件的位置都会有一个按序递增的偏移量,偏移量是一个在分区下严格有序的逻辑,但是并不代表在磁盘上有序,消费者可以通过控制偏移量来对消息进行消费,如消费者可以指定起始偏移量,为了保证消息被顺序消费,消费者已消费的消息对应的偏移量也许要保存。需要注意的是消费者对消费偏移量的操作并不会影响消息本身的偏移量。
日志段
一个日志被划分为多个日志端,日志段是kafka日志对象分片的最小单位,与日志对象一样,日志段也是一个逻辑概念,一个日志段对应磁盘上一个具体日志文件和两个索引文件,日志文件是以.log文件后缀的数据文件,用于保存消息实际数据,两个索引文件分别以.index,和.timeindex作为文件名后缀,分别代表消息偏移量索引文件,和消息时间戳索引文件.
代理
在kafka基本体系架构中我们提到了kafka集群,kafka集群就是有一个或多个kafka实例构成,我们把每一个kafka实例称为代理,也叫代理kafka服务器,在生产环境中kafka集群一般包括一台或者多台,每一台服务器上配置一个或多个代理,每一个代理都有唯一的表示id,这个id是非负整数,
生产者
就是生产消息,向代理发送消息,也就是向kafka代理发送消息的客户端
消费者和消费组
消费者拉取的方式拉取数据,他是消费的客户端,每一个消费者都属于一个消费组,我们可以为每个消费者指定一个消费组,如果没有指定就会属于一个默认的消费组,每个消费者也会有一个全局唯一的id,如果没有指定就kafka默认指定一个,同一个主题的一条消息只能被同一个消费组的某一个消费者消费,但不同的消费组的消费者可以同时消费消息,消费组是kafka实现对一个主题消费进行广播和单播的手段,实现广播只需指定各个消费者属于不同的消费组,消费单播则只需让各个消费者属于一个消费组就行
ISR
kafka在zookeepr中动态维护一个ISR,即保存的是同步的副本列表,该列表中保存的是与leader副本保持消息同步的所有副本对应的代理id,如果副本宕机或者落后太多,就会动态的从ISR列表中移除.
zookeeper
kafka利用zookeeper保存响应元数据信息,kafka元数据信息包括如代理节点信息,kafka集群信息,旧版消费者信息及其消费偏移量信息,主题信息,分区状态信息,分区副本分配方案信息.动态配置信息等,kafka通过监听机制监听节点元数据的变化,从而由zookeeper负责管理维护kafka集群,同时可以通过zookeeper很方便对kafak集群进行水平扩展以及数据迁移
kafka特性
kafak高度依赖于文件系统来存储和缓存消息,我们普遍认为磁盘读写慢,其实并不一定,关键是我们如何使用他,且操作系统提供了预读和延迟写技术,使得磁盘并不是很慢,由于与kafka是基于JVM的,而java对象内存消耗较大,却java对象增加jvm的垃圾回收也频繁和繁琐,基于上面原因kafka使用文件系统和依赖页缓存的存储比维护一个内存的存储或其他结构来存储消息更有优势,因此kafka选择文件系统存储数据。
基于消息系统本身的作用考虑,数据的持久化可以建立在简单对文件进行追加的实现方案上,因此顺序追加,所以kafka在设计上是采用时间复杂度O(1)的磁盘结构,他提供常量时间的性能,即使数据存储TB级数据,性能和数据的大小关系也不大
正如kafka将消息持久化,当机器宕机重启的时候,消息不会丢失
依赖zookeeper来对集群进行协调管理,使得kafka更加容易扩展
支持java,scala,c ,c# ,Erlang等多种客户端
在0.10版本引入了kafka stream,他是一个基于java实现用于流处理jar文件,
为每个主题建立分区,每个分区有一个或多个副本,对数据进行持久化备份
kafka代理无状态,即代理不记录消息是否消费,消息偏移量的管理由于消费者自己或组协调器来维护,集群本身几乎不需要生产者和消费者的状态信息
支持Gzip,Snappy,LZ4这3种压缩方式,可以将多条消息放到一起组成messageSet,然后放到一条消息里面去,从而提高压缩比率而提高吞吐量。
kafka使用场景
延迟操作组件
kafka将一些不立刻执行而要等待满足一定条件才触发完成的操作称为延迟操作,这类操作抽象为一个抽象类DelayedOperation,他是一个基于事件启动有失效时间的TimeTask,TimeTask实现了Runable接口,维护了一个TimerTaskEntry对象,TimerTaskEntry绑定了一个TimerTask,TimerTaskEntry被添加到TimerTaskList中,TimerTaskList是一个环形双向链表,按失效时间排序
DelayedOperationPurgatory
DelayedOperationPurgatory是一个对DelayOperation管理的辅佐类,他以泛型的形式将一个DelayedOperation添加到内部维护的Pool[Any,Watchers]类型watchersForKey对象中,同时将DelayedOperation添加到SystemTimer中,
其中watchers是Purgatory的内部类,底层是一个ConcurrentlinkedQueue,定义了ConcurrentLinkQueue类型的属性operation属性,用于保存DelayedOperation,其实就是对DelayedOperation进行监控,
Purgatory就是根据watchers对DelayedOperation进行管理,通过watchers调用DelayedOperation响应的完成,让DelayedOperation在delsyMs时间内完成,或者超时
DelayedProduce
DelayedProduce(delayMs:long,produceMetadata:ProduceMetedata,
replicaManager:ReplicaManager
responseCallback:Map[TopicPartition,PartitionResponse]=>Unit)
DeplayedProduce是协助ReplicaManager完成相应延迟操作的,而ReplicaManager的主要功能负责生产者将消息写入Leader副本,管理Follower副本与Leader副本之间的同步以及副本之间的角色之间转换,DeplayedProduce显然是在生产者发送消息相关的延迟操作,因此只可能在消息写入到Leader副本时需要DelayedProduce的协助
当ProduceRequest的ack为-1的情况下,会创建一个DelayedProduce对象,而生产者在发送消息的时候,会调用ReplicaManager.appendMessage().追加消息到相应的leader分区之中,此时我们知道ack=-1。意味着生产者需要等待该分区的所有消息都与Leader副本同步之后才会进行下一条消息的发送,若要控制在分区个Follower副本和Leader副本同步完成只有在向生产者应答,就要发挥DelayedProduce的作用了
因此就可以看出DelayedProduce的作用就是协助副本管理器在ack=-1的场景下,延迟回调responseCallBack向生产者做出响应,具体就是当消息在分区的各个Follower副本完成了和Leader副本消息同步之后回调responCallBack给生产者。
DelayedFetch
DelayedFetch就是在FetchRequest处理时进行的延迟操作,而在kafka中只有消费者或Follower副本会发起FetchRequest,FetchRequest是由KafkaApis,handleFetchRequest方法处理,其中会调用ReplicaManager.fetchMessage()方法从相应分区的Leader副本拉去消息,在fetchMessage方法中创建DelayedFetch延迟操作。在拉取消息时候需要延迟操作,是为了本次拉取足够的数据。
DelayedJoin
DelayedJoin是组协调器在消费组准备平衡操作时候进行相应的处理,当消费组的状态转换为PreparingRebalance时候,即准备平衡操作,在组协调器的prepareRebalance(),方法中会创建一个DelayedJoin对象,并交给DelayedOperaionJoin负责监控
之所有加入DelayedJoin,是为了组协调器等待当前消费组下所有的消费者都请求加入了消费组,即发起JoinGroupRequest请求,每次组协调器处理完JoinGroupRequest时都会检测DelayJoin是否满足了完成执行条件
DelayedJoin响应方法的实现是调用GroupCoordnator相关方法来完成,Delayed.tryComplete调用GroupCoordinator.tryCompleteJoin方法,该方法判断是否还有未申请加入消费组的消费者,若所有消费者均申请加入了消费组,则表示DelayedJoin满足le完成执行的条件,否则继续等待,直到满足执行条件或超时。
DelayedHeartbeat
DelayedHeartbeat用于协助消费者与组协调器心跳检测相关的延迟操作。DelayedHeartbeat相关功能的实现是调用GroupCoordinator的响应方法来完成.
DelayedCreateTopics
在创建主题时候,需要为主题的每个分区分配到Leader之后,才会回调函数将创建主题结果返回客户端,DelayCreateTopic延迟操作等待主题的所有分区副本分配到Leader或是等待超时后调用回调函数返回到客户端;
控制器
在启动kafka集群中,每一个代理都会实例化并启动一个kafkaController,并将代理的brokerId注册到zookeeper相应的节点中,kafka集群会在各个代理中选举出一个代理作为Leader控制器,即Leader控制器,当控制器宕机的时候会重新选举新的leader控制器,控制器只要作用就是主题的创建和删除,分区和副本的管理以及代理故障转移,当一个代理被选举成控制器的时候,该代理的kafkacontroller就会注册控制器相应的权限,同时标记自己就是leader,当代理不再是控制器的时候,就要注销掉相应的权限,
{
"leader":"leader对应brokerId",
"leader_epoch":"leader更新次数",
“isr”:"isr列表"
}
优先副本,就是AR中第一个副本称为preferred replica,也就是我们说的优先副本.理想情况下,优先副本即该分区的leader。
控制器初始化
到此,控制器实例化过程结束,当一个代理启动就会创建一个kafkaController实例并启动,在启动kafakcontroller时,先注册一个用于监听zookeeper回话超时的监听器,sessionExpirationListener,然后启动控制器选举,让当前代理试图竞选控制器。
控制器选举过程
每个代理首先会从zookeeper中获取leaderid的信息,解析当前leader的LeaderId,若leaderId=-1,表示还没有节点成功当选leader,则将自身节点信息写入zookeeper中,如果leader!=-1表示已经有代理成为leader,
在抢占/controller节点时候,若出现已存在异常,就获取leader且更新内存的leader的值,否则将leader设置为-1,且删除临时节点.这个时候就会触发重新选举leader.同时节点变化就会触发leaderChangeListener.handleDataChange方法,这时其他代理将通过当前的leaderId和自己的brokerid比较,如果自己是之前的leader,而现在leaderId和自己的brokerid不一样,则自己退位,回调onControllerResignation函数.
故障转移
可以触发控制器进行选举的有三种情况
故障转移其实就是控制权的转移就是重新选出新的控制器,而控制器实例化创建一个zookeeperLeaderElector对象,实例化需要对象需要回调两个函数,分贝是是当选的控制器进行初始化,以及注册响应权限的onControllerFailVover方法,和注销相应权限的onControllerResignation方法,
onControllerFailover操作
以上工作主要是对完成相应元数据的初始化以及对代理,主题,分区等变化感知的监听器的注册,和启动相应管理组件
oncontrollerResignation操作
分区平衡
分区自动平衡是通过分区的优先副本选为分区的leader,通常当分区副本是通过kafka自动分配,会保证分区副本分配在不同的代理节点,即使用优先副本的第一个副本当做leader,这样的分配是一个相对平衡的状态,但是随着时间推移,部分节点变化导致重新选举分区leader,此时优先副本发生故障,然而使用其他副本选举为leader,之后即使优先副本恢复,也不能成为leader.因此我需要进行自动平衡,具体步骤如下
协调器
kafka有三种协调器,消费者协调器,组协调器,任务管理协调器,
kafka高级消费者是强依赖zookeeper,每一个消费者在启动的时候都会在zookeeper对应的路径下,注册监听器,当节点发生变化的时候,消费者进行平衡操作,由于这种方式,当消费组的任何一个消费者发生变化,同一个组的消费者都会进行平衡操作,而消费者之间并不知道其他消费者的状态,回导致kafka工作在一个不正确的状态,同时这种方式完全依赖zookeeper,以监听的方式管理消费者存在两个缺陷
由于上面原因,新版kafka引入了消费者协调器,负责同一个消费组下各个消费者与服务端组协调器之间的通信,服务端引入了组协调器,用于管理部分消费组和该消费组下每个消费者的消费偏移量
消费者协调器
消费者协调器是kafkaconsumer的成员变量,使用他和组协调器进行通信,且是消费者私有的,因此只有对应的消费者才可见,不同消费者不可见,可以简单理解是消费者的一个代理,但是又不是代理,消费者协调器是消费组管理相关请求的发起者
消费者协调器主要责任如下
当消费者协调器向组协调器请求加入的消费组后,组协调者会为同一个组下选出一个leader,leader的消费者的协调器负责消费者与分区的分配,会再给组协调器一个请求,这个请求回到有分配的结果,组协调器会把分配的结果再返回给follower消费者的协调器,而非leader也会有一个请求,但是这个请求中的分配结果是空的, 这种的方式,将分区分配的职责交个客户端自己处理,从而减轻服务端的负担
组协调器
组协调器负责对管理的组员提交的相关请求进行处理,组员即消费者,他负责管理与消费者之间建立连接,并从与之连接的消费者之中选出一个消费者作为leader消费者,同时管理与之连接的消费者偏移量的提交,每个消费者消费偏移量保存到kafka的内部主题中,并通过心跳来检测消费者与自己的连接状态。
消费者加入组的过程
消费偏移量管理
新版kafka将消费偏移量保存到kafka一个内部主题中,当消费者正常运行或者进行平衡操作时候向组协调器提交当前的消费偏移量.组协调器负责消费组的管理和消费偏移量管理,但客户端可以仅仅选择让组协调器管理偏移量,如客户端指定了分区的时候,就不需要kafka负责分区的分配了
当组协调器收到偏移量的提交请求时候,会检查是否满足以下条件
如果都不满足提交的条件就会调用回到函数返回响应的错误码
具体过程如下
网路通信
kafkaserver启动时候,初始化一个socketServer服务,用于接受客户端连接,处理客户端请求,发送响应,同时创建一个kafkaRequestHandlerPool用于管理kafkaRequestHander,
SockerServer是基于java NIO实现的网络通信组件,线程模型是一个Acceptor线程负责接受客户端所有的连接,N个processor线程,每个processor有多个selector,负责从每个连接中读取请求,M个hander线程处理请求,并将产生的请求返回给processor,而handler是由kafkaRequestHanderPool管理,在processor和hander之间通过requestChannel来缓存请求,每个hander从requestChannel.requestQueue接受RequestChannel.Request.把Request交由KafkaApi的hander处理,然后处理后把对应的response存进RequestChannel.responseQueue队列。
Accept
他主要是监听并接受客户端连接的请求,建立和客户端的数据传输通道serverSockerChannel,然后为客户端指定一个processor
他接受客户端新的连接,创建sockerChanel,以轮询的方式交由processor处理,添加到processor的newConnections队列并唤醒processor线程,这样就建立了客户端与kafkaserver之间的通信通道
Processor
他也是一个线程类,主要用于客户端读取请求数据和将响应结果返回给客户端RequestChannel
他是为了给processor线程与handler线程之间通信提供数据缓存,是通信过程中Request和Response缓存的通道,是processor线程与hander线程交换数据的地方
SocketServer
socketServer启动就可以通过Acceptor接口客户端请求,交由Acceptor对应的Processor处理,Processor线程将请求添加到RequestQueue队列中,Hander从RequestQueue取出请求分发处理,然后将结果存入responseQueue队列中,添加response时会唤醒与之对应的processor,Processor从RequestChannel.responseQueues队列中取出自己对应的responseQueue队列根据ResponseAction进行相应处理。
日志管理器
日志管理器是kafka用来管理所有日志,包括日志的创建,删除,日志检索,日志加载和恢复,检查点,以及日志文件刷写磁盘,日志清理。
在kafka中,每个主题之间互相独立,每个主题在逻辑上由一个或多个分区构成,分区树可以在创建主题的时候创建,也可以在主题创建后在修改,但只能增加一个主题的分区数,而不能减少分区数,
存储结构上分区的每个副本在逻辑上对应一个log对象,每个log有划分为多个LogSegment,每个LogSegment包括一个日志文件和两个索引文件,一个索引文件是偏移量索引文件,另一个是时间戳索引文件,每个log对象中维护了一个concurrentSkipListMap,其底层是一个跳跃表,保存主题的每个分区对应的所有logsegment,kafka将日志文件封装成一个FileMessageSet对象,两个索引文件分别封装成OffsetIndex,和TimeIndex对象,
如最上面图显示,分区对应的目录的命名规则为主题名-分区编号,分区编号从0开始顺序递增,分区编号最大值为分区总数键1,数据文件的命令规则是由数据文件第一条消息的偏移量(基准偏移量),左补0构成20位数字字符组成,每个分区第一个数据文件的基准偏移量为0,因此每个分区的第一个日志文件为000000000000000000000.log,索引文件的命名是一样,如000000000000000000.index,0000000000000000000.timeindex.
如我创建一个主题log-format,且分区为3,副本为2,其分布如下图
消息组成部分
每条消息有一个固定长度的消息头和一个可变长度的payload组成,payload也称为消息体,上图是消息结构的说明,在实际存储的消息总长度还包括12字节额外的开销,这个12字节包括两部分,其中8个字节代表消息的偏移量,另外4字节代表消息的总长度,因此一个当前版本kafka一条消息固定长度为34字节
数据文件
下图为一个数据文件的部分内容
可以看到,offset=1与offset=0的消息的position只差为35,用35减1(payloadSize)为消息的固定长度34,
数据文件存储的消息,由于是第一个数据文件,因此起始偏移量为0,上面内容的postition代表的是在数据文件中的实际位置,之后每条消息的position为前一条消息的postion于消息固定长度和消息总长度之和,CreateTime表示消息时间类型为消息创建时间,isValid表示消息CRC校验是否合法,payloadSize表示消息实际长度,CompressCodeec表示消息压缩方式,crc表示消息的crc32校验和,payload表示消息的实际内容
偏移量索引文件
本来在kafka是将消息分段保存在不同的文件中,同时每条消息都一个唯一的偏移量,数据文件已该文件基准偏移量左补0命名,并将每个日志段以基准偏移量key保存到concurrentSkipListMap中,这样查找指定偏移量的消息时候,用二分法找到消息所在的段文件,但是为了进一步提高查找效率,kafka为每个数据文件创建了一个基于偏移量的索引文件,该索引文件文件名和数据文件相同,后缀为index,
如果我们要查找指定偏移量为23消息,如下步骤
时间戳索引文件
该时间戳索引文件和对应的数据文件名称一样,以timeindex为后缀,该索引文件包括一个8字节的时间戳字段,和一个4字节的偏移量字段
如果我们要查找时间戳为1557554753430的消息
日志清理
kafka提供了两种策略,日志删除,和日志压缩,通过参数cleanUp.policy指定日志清除策略,可以控制到主题级别,主题级别策略会覆盖代理级别的配置策略
日志删除
在日志管理器启动有一个定时任务线程用于定时的删除日志段文件,默认是5分钟执行一次,可以通过${log.retention.check.interval.ms}设置
kafka提供了基于日志保留时长和日志段大小两种日志删除配置方式
日志保留时长
默认是7天,可以通过log.retention.minutes设置,要主要的是查找保留时长的日志段文件,并不是剪短的依据日志单最晚更新时间,他并不能代表真正反映日志单在瓷片的保留时间,如分区副班重分配是后该日止更新时间会被修改
因此最长时间是通过查询日志分段的时间戳所以你文件,查到到时间戳索引文件中最后一项索引项,若索引项的时间戳字段大于0,就取改值,否则去最近修改时间
在计算出日志最长时间后,从最早日志段文件依次扫描直到第一个不满足超时条件的段文件,查找到要删除的文件,若删除的日志单总数等于该分区日志段的总数,说明所有日志段均过期,但该分区下至少要有一个日志段接受消息的写入,因此,需要切分一个新的日志段,然后迭代删除待删除的日志段文件,
删除的过程如下
基于日志大小
日志删除任务会检查当前日志大小是否超过设定值,通过log.retention.bytes设置,
删除过程如下
日志压缩
这种策略是一种更细粒度的清理策略,他是基于消息的key,通过压缩每个key对应的消息只保留最后一个版本的数据,该key对应其他版本在压缩时会被清除,类似数据库的更新操作,压缩策略将可key对应值为空的消息,认为是直接删除该消息,为了不影响日志追加操作,日志压缩并不会在活跃段进行操作,同时对非活跃段压缩也不是一次性执行,而是分批进行
需要注意将日志清理与日志删除区分开,日志删除是删除整个日志段,而日志清理是将相同key的日志进行合并,只保留key最后一个值,将合并后的数据构成新的日志段,同时删除原来的日志段,
副本管理器
kafka 0.8 版本引入了副本管理器,引入副本机制使得kafka能够在整个集群中只要保证至少一个代理存活就不会影响整个集群
kafka存活的条件
leader副本会跟踪所有同步的节点,一旦一个节点宕机,卡主,或者延迟太久,leader副本就会将该节点从同步副本集合列表中移除,
如何判断代理卡主或者下线
副本的数量可以通过以下方式配置(默认副本数是1)
副本管理器负责副本管理,主要包括对控制器发送的leaderAndIsrRequest指定,stopReplicaRequest指令以及UpdateMetedataRequest指令进行处理,维护副本ISR变化,以及Follower与Leader数据同步的管理
首先我们现在先介绍一下分区和副本的相关知识
分区
kafka将一个主题在逻辑上分成一个或多个分区,每个分区在物理存储上对应一个目录,目录名为${topicName}-${partionId}其中topicName为主题的名称,partionId是分区编号,每个主题的分区都有一个唯一的编号,从0递增.分区数可以大于节点数,但是副本数不能大于节点数,因为副本需要分布在不同的节点上,这样才能达到本分的目的。
如上图所示每个分区,在存储结构上有LEO和HW两个重要的概念
LEO是log end offset的缩写,表示每个分区最后一条消息的位置,分区每个副班都有自己的LEO
HW是Hight Watermark的缩写,将一个分区对应的ISR中最小的LEO作为HW,HW之前的消息表示已提交的消息,对消费者是可见的,消费者最多只能消费到HW所在的位置,HW之后的消息表示还没有被Follower副本同步完成,每个副本都有自己的HW,副本leader和follower各自负责更新自己HighWatermark状态,Follow.hw<=leader.leo
对于分区leader副本,leo和hw的存储结构示意图如下
每个主题的一个分区只能被同一个消费者下的其中一个消费者消费,因此我们说分区是消费并行度的基本单位,
副本
一个分区可以有一个或多个副本,副本根据是否接受读写其请求,分为Leader副本和follower副本,一个分区有一个leader副本,有0个或多个Follower副本,leader副本处理分区的所有读写请求并维护自身及Follower副本的状态信息,如LEO,HW,等副本作为消费者从leader副本拉去消息进行同步,当leader失效时候,通过分区leader选举器从副本类表中选出一个副本作为新的leader.
如果brokerid与当前代理brokerId相同时候,我们将该副本叫为本地副本,否则叫远程副本,副本抽象一个Relica对象,这个对象的属性包含了主题,分区,代理编号,还有LEO,HW,副本追加数据的log,以及上次与leader同步的时间,因此还有logEndoffsetMetadata,higheWatermarkMetadata,Log和lastCaughtUpTimeMsUnderlying属性字段,对于远程副本而言log字段对应的值为null,因为远程部分log并不在当前代理上,logEndoffsetMetadata表示已追加到log的最新对应的偏移量,不过本地副本和远程副本获取此字段方式不同,远程副本由于log属性为空,因此并不能直接从本地获取,而该字段的值是由远程副本对应的代理发送请求进行更新,对于Follower副本highWatermarkMetadata的值是从Leader副本获取更新.
副本过期检查
follower把leader的LEO之前的日志全部同步完成时候,则认为follower已经赶上了leader,此时会以当前时间更新该副本的lastCaughtUpTimeMs字段,kafka副本管理器会启动一个过期检查的定时任务,这个任务会定期检查当前时间与副本lastCaughtUpTimeMs之差是否超过replica.lag.time.max.mx值,如果大于,则会把这个副本提出ISR副本集合
假设最右侧的foller副本被提出ISR集合,这个分区的HW就会发生变化变成3.
追加消息
副本管理器用appendmessage方法将消息写入副本的处理逻辑如下
appendMessages(timeout:Log,
requiredAcks:Short,
internalTopicsAllowed:Bollean,
messagePerPartition:Map[Topicpartition,MessageSet],
responseCallback:Map[TopicPartition,PartitionResponse]=>Unit)
拉去消息
副本数据同步过程
在初始状态下,leader和follower的HW和LEO都是0,leader副本会保存remote leo,表示follower的LET为0,这个时候,producer没有发送消息,follower会不断向leader不断的向leader发送fetch请求,但是因为没有数据,这个请求就会被寄存,在指定时间内(replica.fetch.wait.max.ms)后会被强制完成,如果在指定时间内,有消息发送过来,就会唤醒Fetch请求,让leader继续处理
数据的同步分为两种情况
第一种情况同步不走如下
第二种情况
leader接受到新的数据,当leader收到请求会唤醒处于阻塞Fetch请求,处理过程和第一种基本一样,
我们知道min.insync.relicas=1,是表示需要多少个副本同步才能表示消息是提交的,默认值是1(server.properties中配置,并且acks=-1(表示需要所有副本确认,此值参数才生效),这个参数提高了数据了可靠性,
要说明的是acks对应的值说明
当min.insync.replicas=1的时候,一旦消息写入leader端本地就认为已提交,而延迟一轮fetch更细HW值的设计是的follower的hw值是异步延迟更新的,如果此时leader宕机,成为新的leader的follower的hw就可能是过期的,则认为成功提交的消息被删除
那么kafka有没有解决数据丢失的解决方案呢
kafka0.11.0.0引入了leader epoch来解决这个问题
epoch实际上就是一对值(epoch,offset),epoch带包leader的版本,从0开始递增,当leader发送变化,epoch+1,而offset则对应这个epoch版本的leader写入第一条消息的offset
比如,有两个leader epoch<0,0>和<1,120>那么第一个leader epoch版本号是0,这个版本的leader从位移0开始保存消息,一共保存了120条消息,之后,leader发生变更,版本号增加到1,新版本的起始位移是120。
当引入leader epoch机制后,Follower副本B重启回来之后,需要向A发送一个特殊的请求获取leader 的leo值,follower发现leo值不比他自己的leo小,且缓存中epoch的起始位移没有大于这个leo的值,因此副本不需要截断日志,这样就会丢失数据
但是当leader此时宕机,follower副本成为了leader,同样的原leader重启回来之后,也会向原follower副本获取leo的值,发现获取的leo的值和不比自己的leo值小,因此原leader不会进行截短日志,同时在更新元副本的缓存leader epoch的值,这样就能保证数据的丢失。