Kafka除了状态管理以外,还主要有以下功能职责:
Client可以从任何一台broker上获取集群完整的元数据信息,这就需要controller在集群元数据信息发生变更后通知每一个broker。当有分区信息变更时,controller会将变更后的信息封装进UpdateMetadataRequest请求中,然后发送给集群中的每个Broker。
Controller启动时会创建一个Zookeeper监听器,该监听器的作用是监听/brokers/topics下子节点的变化情况。
Topic在创建时首先会在/brokers/topics节点下面创建一个子节点(znode),并且将该Topic的分区和副本情况写入这个znode中。当监听器发现/brokers/topics有新增znode时,就会立即触发controller创建Topic(为新的Topic每个分区确定Leader和ISR),然后更新集群元数据信息。
当创建完成以后,controller会创建一个新的监听器/brokers/topics/{topic_name},监听新的Topic内容的变更。这样当Topic分区发生变化时,controller能够及时收到通知。
Kafka集群删除Topic都是向Zookeeper下的/admin/deletetopics下新增一个子节点(znode)。controller在启动时会在/admin/deletetopics创建一个监听器监听该节点的子节点是否发生变更。当有新的子节点被创建时,controller立即开启删除Topic删除逻辑。主要执行两步:
分区重分配主要为了对Topic下面的所有分区重新分配副本所在的broker机器,以期望实现更均匀的分配。该操作需要管理员手动定制分配方案,并按照指定格式写入Zookeeper的/admin/reassign_partitions下。
分区重分配的过程是先扩展再收缩的过程。controller首先将分区副本进行扩展,等他们全部与Leader副本同步以后将Leader设置为新分配方案中的副本,最后执行收缩,将分区副本减少为分配方案中的副本集合。
为了避免分区副本分配不均匀,引入preferred副本的概念,假设一个分区副本的列表是[1,2,3],那么broker1就是该分区的preferred leader。但是随着时间的推进,分区leader发生变化,最后preferred leader不一定就是分区leader。
上面两种方法都会往Zookeeper的/admin/preferredreplicaelection节点写入数据。controller也会注册该节点的目录监听器。一旦接收到改变通知,controller会将对应分区的leader调整回副本列表中的第一个,并且广播出去。
当前增加分区通常通过kafka-topics.sh的--alert选项,它会向Zookeeper下的/brokers/topics/{待修改的Topic}中写入新的分区目录。
由于在创建Topic以后,会注册一个监听该目录变化监听器,因此当目录发生变化时,controller会收到通知,执行分区创建任务,之后更新集群元数据信息。
每个broker创建成功后,都会在Zookeeper的/brokers/ids节点下创建一个子节点,并写入broker的信息。controller会在/brokers/ids创建一个监听该节点的子节点是否发生变化的监听器,一旦收到通知,然后就会更新集群元数据信息。
broker在加入集群时注册的节点是个临时节点,伴随broker的崩溃,临时节点消失,然后controller会收到通知,开启关闭broker逻辑,并更新集群元数据信息。
受控关闭是指的以kafka-server-stop.sh或者kill -15的方式关闭kafka broker。
受控关闭是由即将关闭的broker向controller发送ControlledShutdownRequest。当发送完请求后,broker处于阻塞状态,controller会进行leader重选举和ISR收缩调整后,会给broker发送ControlledShutdownResoponse,表示broker可以关闭。
当所有broker一起启动时,会争先在Zookeeper上创建/controller临时节点,并写入controller broker的信息,Zookeeper可以保证只有一个broker可以创建成功,创建成功的broker节点还会更新/controller_epoch的值。没有创建成功的broker监听/controller,加入controller宕机,其他broker将会收到通知,然后去竞选controller。
controller启动时会与集群中的所有broker(包括controller在的broker)建立TCP连接,并且会为每个TCP连接建立一个RequestSendThread,也就是说controller会和每个broker建立一个TCP连接,并且开启一个I/O线程。
controller目前主要有以下三种请求: