KafkaApis::handle
方法后根据requestId
作分流处理, ProduceRequest
也不例外;/brokers/topics
节点的变化,当有新的topic信息被写入后,Controller开始处理新topic的创建工作;ReplicaManager::becomeLeaderOrFollower
最终会处理Leader或Follower角色的创建或转换;replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
partition.makeLeader(controllerId, partitionStateInfo, correlationId)
,其中最重要的是leaderReplica.convertHWToLocalOffsetMetadata()
, 在Leader replica上生成新的high watermark
;partition.makeFollower(controllerId, partitionStateInfo, correlationId)
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
, 同步线程会不停发送FetchRequest
到Leader来拉取新的消息authorizer
先判断是否有未验证的RequestInfo
val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.data.partition { case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic)) }
RequestInfo
都是未验证的,则不会处理请求中的数据
sendResponseCallback(Map.empty)
replicaManager
来处理消息的写入;handlerequest.png
ReplicaManager::appendMessages
,将消息写入本地log文件(虽写入了log文件,但只是更新了LogEndOffset, 还并未更新HighWaterMark, 因此consumer此时无法消费到),同时根据客户端所使用的ack策略来等待写入复本;1. 等待复本同步的反馈,利用了延迟任务的方式,其具体实现可参考[DelayedOperationPurgatory--谜之炼狱](https://www.jianshu.com/p/bbb1c4f45b4e),
1. 前面说过Follower在成为Follower的同时会开启`ReplicaFetcherThread`,通过向Leader发送`FetchRequest`请求来不断地从Leader来拉取同步最新数据, `ReplicaManager::fetchMessage`处理`FetchRequest`请求,从本地log文件中读取需要同步的数据,然后更新本地对应的`Replica`的LogEndOffset, 同时如果所有isr中的最小的LogEndOffset都已经大于当前Leader的HighWaterMark了, 那么Leader的HighWaterMark就可以更新了, 同时调用`ReplicaManager::tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicAndPartition))`来完成对客户端发送消息的回应.
2. 从上面的1中我们看到实际上发送`FetchRequest`的replica还未收到Response,这个`Leader`的HighWaterMark可能已经就更新了;对于Replica的FetchRequest的回应
1. 在`ReplicaManager::fetchMessage`, 调用`readFromLocalLog`从本地log中读取消息后,先判断是否可以立即发送`FetchRequest`的response:
// respond immediately if
// 1) fetch request does not want to wait
// 2) fetch request does not require any data
// 3) has enough data to respond
// 4) some error happens while reading data
1. 如查不能立即发送, 需要构造`DelayedFetch`来延迟发送`FetchRequest`的response, 这可能是