KafkaController分析7-启动流程

前面我们已经分析了KafkaController中使用的一系列组件, 从本章开始,我们开始介绍KafkaController的各个功能:
KafkaController分析1-选主和Failover
KafkaController分析2-NetworkClient分析
KafkaController分析3-ControllerChannelManager
KafkaController分析4-Partition选主
KafkaController分析5-Partition状态机
KafkaController分析6-Replica状态机
KafkaController启动流程
  • 注册zk的SessionExpiration事件通知:registerSessionExpirationListener, 当session到期且新session建立后,进行controller的重新选主;
def handleNewSession() {
      info("ZK expired; shut down all controller components and try to re-elect")
      inLock(controllerContext.controllerLock) {
        onControllerResignation()
        controllerElector.elect
      }
    }
  • 启动 ZookeeperLeaderElector:controllerElector.startup. 如果当前broker成功选为Controller, 则onControllerFailover回调被触发.
      readControllerEpochFromZookeeper()
      incrementControllerEpoch(zkUtils.zkClient)
      registerReassignedPartitionsListener()
      registerIsrChangeNotificationListener()
      registerPreferredReplicaElectionListener()
      partitionStateMachine.registerListeners()
      replicaStateMachine.registerListeners()
      initializeControllerContext()
      replicaStateMachine.startup()
      partitionStateMachine.startup()
      brokerState.newState(RunningAsController)
      maybeTriggerPartitionReassignment()
      maybeTriggerPreferredReplicaElection()
      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
      if (config.autoLeaderRebalanceEnable) {
        autoRebalanceScheduler.startup()
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
          5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
      }
      deleteTopicManager.start()
  1. 更新zk上的controller epoch信息;
  2. 注册zk上的broker/topic节点变化事件通知;
  3. 初始化ControllerContext, 主要是从zk上获取broker, topic, parition, isr, partition leader, replicas等信息;
  4. 启动ReplicaStateMachine;
  5. 启动PartitionStateMachine;
  6. 发送所有的partition信息(leader, isr, replica, epoch等)到所有的 live brokers;
  7. 如果允许自动leader rebalance的话, 则启动AutoRebalanceScheduler;
  8. 启动TopicDeletionManager;
  • KafkaController的启动图解:

KafkaController.png

Kafka源码分析-汇总

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Ceph对象存储方案

Bluestore下的OSD开机自启动分析

整个Bluestore现在由官方推出的ceph-volume工具进行管理,用以替代之前的ceph-disk。回顾之前ceph-disk是通过在xfs文件系统中打...

78240
来自专栏坚毅的PHP

jersey处理支付宝异步回调通知的问题:java.lang.IllegalArgumentException: Error parsing media type 'application/x-www

tcpflow以流为单位分析请求内容,非常适合服务器端接口类服务查问题 这次遇到的问题跟支付宝支付后的回调post结果有关 淘宝的代码例子: publi...

62850
来自专栏非典型程序猿

Golang任务队列machinery使用与源码剖析(二)

在Golang任务队列machinery使用与源码剖析(一)一文中,我们主要对golang中任务队列machinery的设计结构以及具体模块的功能与源码实现进行...

1.7K80
来自专栏腾讯IVWEB团队的专栏

React Native 与 OC 之间通信那些事儿

React Native 用 IOS 自带的 JavaScriptCore 作为 JS 的解析引擎,普通的 JS-OC 通信就是 React Native 在...

52300
来自专栏IMWeb前端团队

React Native与OC之间通信那些事儿

React Native用IOS自带的JavaScriptCore作为JS的解析引擎,普通的JS-OC通信就是React Native在OC定义一个模块方法,J...

24170
来自专栏JackeyGao的博客

SLB和django runserver结合报错问题

SLB 检测流量会使服务器报[Errno 104] Connection reset by peer

15110
来自专栏Ceph对象存储方案

Luminous下删除和新建OSD的正确姿势

L版本开始极大的降低了对运维操作复杂度,新增了很多命令去确保数据安全,很多新手在删除OSD的时候很容易忽视了集群PGs的状态最终导致数据丢失,因此官方加入以下几...

41920
来自专栏Hellovass 的博客

社交化分享组件踩坑

问题是这样的,项目里的社交化分享是基于 UMShare 封装成的一个 ShareLib module,为了让这个 module 对调用者说更透明,我将 WXEn...

37050
来自专栏cloudskyme

网页中显示xml,直接显示xml格式的文件

第一种方法 使用<pre></pre>包围代码(在浏览器中测试不行啊,但是在富编辑器中又可以,怪); 使用<xmp></xmp>包围代码(官方不推荐,但是效果不...

51670
来自专栏逸鹏说道

9.数据库服务器部署之------3步实现远程访问

平台之大势何人能挡? 带着你的Net飞奔吧!:http://www.cnblogs.com/dunitian/p/4822808.html#iis 原文:htt...

36360

扫码关注云+社区

领取腾讯云代金券