前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka是如何处理客户端发送的数据的?

Kafka是如何处理客户端发送的数据的?

作者头像
扫帚的影子
发布2018-09-05 16:57:54
1.9K0
发布2018-09-05 16:57:54
举报

  • 客户端的ProduceRequest如何被Kafka服务端接收?又是如何处理? 消息是如何同步到复本节点的? 本篇文章都会讲到, 实际上就是综合运用了上面第三点中的内容
  • 上一节我们讲到所有的Request最终都会进入到KafkaApis::handle方法后根据requestId作分流处理, ProduceRequest也不例外;

Topic的Leader和Follower角色的创建

  • 之前在ReplicaManager源码解析2-LeaderAndIsr 请求响应中留了个尾巴,现在补上;
  • 通过Kafka集群建立过程分析我们知道,Kafkaf集群的Controller角色会监听zk上/brokers/topics节点的变化,当有新的topic信息被写入后,Controller开始处理新topic的创建工作;
  • Controller 使用Partition状态机Replica状态机来选出新topic的各个partiton的主,isr列表等信息;
  • Controller 将新topic的元信息通知给集群中所有的broker, 更新每台borker的Metadata cache;
  • Controller 将新topic的每个partiton的leader, isr , replica list信息通过LeaderAndIsr Request发送到对应的broker上;
  • ReplicaManager::becomeLeaderOrFollower 最终会处理Leader或Follower角色的创建或转换;
  • Leader角色的创建或转换:
    1. 停掉partition对应的复本同步线程; replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
    2. 将相应的partition转换成Leader partition.makeLeader(controllerId, partitionStateInfo, correlationId),其中最重要的是leaderReplica.convertHWToLocalOffsetMetadata(), 在Leader replica上生成新的high watermark;
  • Follower角色的创建或转换:
    1. 将相应的partition转换成Follower partition.makeFollower(controllerId, partitionStateInfo, correlationId)
    2. 停掉已存在的复本同步线程 replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
    3. 截断Log到当前Replica的high watermark logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
    4. 重新开启当前有效复本的同步线程 replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset), 同步线程会不停发送FetchRequest到Leader来拉取新的消息

客户端消息的写入

  • kafka客户端的ProduceRequest只能发送给Topic的某一partition的Leader
  • ProduceRequest在Leader broker上的处理 KafkaApis::handleProducerRequest
    1. 使用authorizer先判断是否有未验证的RequestInfo val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.data.partition { case (topicAndPartition, _) => authorize(request.session, Write, new Resource(Topic, topicAndPartition.topic)) }
    2. 如果RequestInfo都是未验证的,则不会处理请求中的数据 sendResponseCallback(Map.empty)
    3. 否则, 调用replicaManager来处理消息的写入;
    4. 流程图:

handlerequest.png

  • Leader通过调用ReplicaManager::appendMessages,将消息写入本地log文件(虽写入了log文件,但只是更新了LogEndOffset, 还并未更新HighWaterMark, 因此consumer此时无法消费到),同时根据客户端所使用的ack策略来等待写入复本;
1. 等待复本同步的反馈,利用了延迟任务的方式,其具体实现可参考[DelayedOperationPurgatory--谜之炼狱](https://www.jianshu.com/p/bbb1c4f45b4e), 
1. 前面说过Follower在成为Follower的同时会开启`ReplicaFetcherThread`,通过向Leader发送`FetchRequest`请求来不断地从Leader来拉取同步最新数据, `ReplicaManager::fetchMessage`处理`FetchRequest`请求,从本地log文件中读取需要同步的数据,然后更新本地对应的`Replica`的LogEndOffset, 同时如果所有isr中的最小的LogEndOffset都已经大于当前Leader的HighWaterMark了, 那么Leader的HighWaterMark就可以更新了, 同时调用`ReplicaManager::tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicAndPartition))`来完成对客户端发送消息的回应.
2. 从上面的1中我们看到实际上发送`FetchRequest`的replica还未收到Response,这个`Leader`的HighWaterMark可能已经就更新了;对于Replica的FetchRequest的回应
1. 在`ReplicaManager::fetchMessage`, 调用`readFromLocalLog`从本地log中读取消息后,先判断是否可以立即发送`FetchRequest`的response: 

// respond immediately if

// 1) fetch request does not want to wait

// 2) fetch request does not require any data

// 3) has enough data to respond

// 4) some error happens while reading data

1. 如查不能立即发送, 需要构造`DelayedFetch`来延迟发送`FetchRequest`的response, 这可能是
Kafka源码分析-汇总
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.09.04 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Topic的Leader和Follower角色的创建
  • 客户端消息的写入
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档