Kafka初始化流程与请求处理

Kafka的初始化启动流程

  • 由KafkaServer::startup来负责;
  • KafkaServer::startup主要是创建并启动各种Manager;
  • 上图:

kafkaserver_startup.png

  • KafkaHealthcheck: core/src/main/scala/kafka/server/KafkaHealthcheck.scala,其作用是在broker info注册到zk的/brokers/id路径下, 且监听zk的session expiration事件,触发时重新注册;
  • 上图中的各个启动的组件我们慢慢都会介绍到, 先从请求的接收与响应开始~~~

请求处理

  • SocketServer: 负责处理网络连接, 数据的接收和发送, 其中的RequestChannel负责向应用层转递请求,也负责把应用层的response传回网络层后发送出去; 详细见:Kafka源码分析-网络层-1 Kafka源码分析-网络层-2 Kafka源码分析-网络层-3
  • KafkaRequestHandlerPool: 线程池, 每个线程里跑一个KafkaRequestHandler
  • KafkaRequestHandler: 循环调用RequestChannel::receiveRequest来poll到新的request交给KafkaApis处理;
  • KafkaApis: 处理request的分发
request.requestId match {
        case RequestKeys.ProduceKey => handleProducerRequest(request)
        case RequestKeys.FetchKey => handleFetchRequest(request)
        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
        case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
        case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
        case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
        case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
        case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
        case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
        case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId

上图:

kafkaapis.png

下篇我们开始KafkaController分析-选主和Failover

Kafka源码分析-汇总

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏xcywt

WireShark 使用

1、干货 Wireshark(前称Ethereal)是一个网络封包分析软件。网络封包分析软件的功能是撷取网络封包,并尽可能显示出最为详细的网络封包资料。Wire...

26690
来自专栏北京马哥教育

记一次惊心的网站 TCP 队列问题排查经历

19050
来自专栏向治洪

认识Kubernates(K8S)

在后端开发中,在介绍Jenkins的可伸缩部署方式上,主要有两种方式:一种是基于Docker(或者docker-swarm 集群)的部署方式,另外一种是基于ku...

92480
来自专栏顶级程序员

记一次惊心的网站TCP队列问题排查经历

此时问题已经影响到整个网站的正常业务,我的那个心惊的呀,最主要报警系统没有任何报警,服务运行一切正常,瞬时背上的汗已经出来了。但还是要静心,来仔细寻找蛛丝马迹,...

13420
来自专栏进击的程序猿

raft 系列解读(2) 之 测试用例raft 系列解读(2) 之 测试用例

基于mit的6.824课程,github代码地址:https://github.com/zhuanxuhit/distributed-system

17020
来自专栏xcywt

TCP头部分析与确认号的理解

1、TCP的特点: 基于字节流 面向连接 可靠传输 缓冲传输 全双工 流量控制 2、头部格式和说明 图源百度。如下图示,就是TCP包的头部结构。可以看到这个头部...

404100
来自专栏desperate633

TCP协议浅析TCP概述TCP可靠数据传输TCP流量控制TCP连接管理

上图我们进行一个分析,以便搞清楚tcp序列号和ack的应用 首先,hostA作为发送方给B发送数据,随机选择一个序列号seq = 42,也就是这段segmen...

15820
来自专栏郑家乐的专栏

多版本 Node.js 使用 Workflow

NodeJS 成为新一届的版本帝后,需要预编译的模块常常更新不够及时,就会出现我这样上班时间搞环境 ,那么如何保持一机多版本继续使用低版本的 NodeJS 运行...

36400
来自专栏Web 开发

Web前端安全学习-CSRF

在数据库有了一层安全保护之后,攻击者们的目标,从服务器转移到了用户身上。由此,出现了CSRF攻击和XSS攻击。

10000
来自专栏前端开发

前端安全问题之-CSRF攻击

30630

扫码关注云+社区

领取腾讯云代金券