前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka初始化流程与请求处理

Kafka初始化流程与请求处理

作者头像
扫帚的影子
发布2018-09-05 17:14:03
1.1K0
发布2018-09-05 17:14:03
举报
文章被收录于专栏:分布式系统进阶

Kafka的初始化启动流程

  • 由KafkaServer::startup来负责;
  • KafkaServer::startup主要是创建并启动各种Manager;
  • 上图:

kafkaserver_startup.png

  • KafkaHealthcheck: core/src/main/scala/kafka/server/KafkaHealthcheck.scala,其作用是在broker info注册到zk的/brokers/id路径下, 且监听zk的session expiration事件,触发时重新注册;
  • 上图中的各个启动的组件我们慢慢都会介绍到, 先从请求的接收与响应开始~~~

请求处理

  • SocketServer: 负责处理网络连接, 数据的接收和发送, 其中的RequestChannel负责向应用层转递请求,也负责把应用层的response传回网络层后发送出去; 详细见:Kafka源码分析-网络层-1 Kafka源码分析-网络层-2 Kafka源码分析-网络层-3
  • KafkaRequestHandlerPool: 线程池, 每个线程里跑一个KafkaRequestHandler
  • KafkaRequestHandler: 循环调用RequestChannel::receiveRequest来poll到新的request交给KafkaApis处理;
  • KafkaApis: 处理request的分发
代码语言:javascript
复制
request.requestId match {
        case RequestKeys.ProduceKey => handleProducerRequest(request)
        case RequestKeys.FetchKey => handleFetchRequest(request)
        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
        case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
        case RequestKeys.JoinGroupKey => handleJoinGroupRequest(request)
        case RequestKeys.HeartbeatKey => handleHeartbeatRequest(request)
        case RequestKeys.LeaveGroupKey => handleLeaveGroupRequest(request)
        case RequestKeys.SyncGroupKey => handleSyncGroupRequest(request)
        case RequestKeys.DescribeGroupsKey => handleDescribeGroupRequest(request)
        case RequestKeys.ListGroupsKey => handleListGroupsRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId

上图:

kafkaapis.png

下篇我们开始KafkaController分析-选主和Failover

Kafka源码分析-汇总
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2016.12.30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka的初始化启动流程
  • 请求处理
  • 上图:
  • 下篇我们开始KafkaController分析-选主和Failover
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档