专栏首页分布式系统进阶Kafka集群Metadata管理Kafka源码分析-汇总

Kafka集群Metadata管理Kafka源码分析-汇总

  • 对于集群中的每一个broker都保存着相同的完整的整个集群的metadata信息;
  • metadata信息里包括了每个topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;
  • Kafka客户端从任一broker都可以获取到需要的metadata信息;

Metadata的存储在哪里 --- MetadataCache组件
  • 在每个Broker的KafkaServer对象中都会创建MetadataCache组件, 负责缓存所有的metadata信息;
val metadataCache: MetadataCache = new MetadataCache(config.brokerId)
  • 所在文件: core/src/main/scala/kafka/server/MetadataCache.scala
  • 所有的metadata信息存储在map里, key是topic, value又是一个map, 其中key是parition id, value是PartitionStateInfo
private val cache: mutable.Map[String, mutable.Map[Int, PartitionStateInfo]] =
    new mutable.HashMap[String, mutable.Map[Int, PartitionStateInfo]]()
  • PartitionStateInfo: 包括LeaderIsrAndControllerEpoch和Replica数组; 下面的readFrom方法从接受到的buffer构造一个PartitionStateInfo对象:
def readFrom(buffer: ByteBuffer): PartitionStateInfo = {
    val controllerEpoch = buffer.getInt
    val leader = buffer.getInt
    val leaderEpoch = buffer.getInt
    val isrSize = buffer.getInt
    val isr = for(i <- 0 until isrSize) yield buffer.getInt
    val zkVersion = buffer.getInt
    val replicationFactor = buffer.getInt
    val replicas = for(i <- 0 until replicationFactor) yield buffer.getInt
    PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr.toList, zkVersion), controllerEpoch),
                       replicas.toSet)
  }
  • MetadataCache还保存着推送过来的有效的broker信息
private var aliveBrokers: Map[Int, Broker] = Map()
MetadataCache如何获取和更新metadata信息
  • KafkaApis对象处理UpdateMetadataRequest
case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
  • handleUpdateMetadataRequest:
    val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
    
    authorizeClusterAction(request)

    replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache)

    val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
    requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, updateMetadataResponse)))

可以看到是调用了ReplicaManager.maybeUpdateMetadataCache方法, 里面又会调用到MetadataCache.updateCache方法

  • MetadataCache.updateCache:
      aliveBrokers = updateMetadataRequest.aliveBrokers.map(b => (b.id, b)).toMap
      updateMetadataRequest.partitionStateInfos.foreach { case(tp, info) =>
        if (info.leaderIsrAndControllerEpoch.leaderAndIsr.leader == LeaderAndIsr.LeaderDuringDelete) {
          removePartitionInfo(tp.topic, tp.partition)
        } else {
          addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
        }
      }

干三件事

  1. 更新aliveBrokers;
  2. 如果某个topic的的parition的leader是无效的, 则removePartitionInfo(tp.topic, tp.partition);
  3. 新增或更新某个topic的某个parition的信息, addOrUpdatePartitionInfo(tp.topic, tp.partition, info): 将信息meta信息保存到MetadataCachecache对象中;
Metadata信息从哪里来
  • 这个问题实际上就是在问UpdateMetaRequest什么时候发送的;
  • 来源肯定是KafkaController发送的;
  • broker变动, topic创建, partition增加等等时机都需要更新metadata;
谁使用metadata信息
  • 主要是客户端, 客户端从metadata中获取topic的partition信息, 知道leader是谁, 才可以发送和消费msg;
  • KafkaApis对象处理MetadataRequest
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
  • KafkaApis.handleTopicMetadataRequest:
    val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]

    //if topics is empty -> fetch all topics metadata but filter out the topic response that are not authorized
    val topics = if (metadataRequest.topics.isEmpty) {
      val topicResponses = metadataCache.getTopicMetadata(metadataRequest.topics.toSet, request.securityProtocol)
      topicResponses.map(_.topic).filter(topic => authorize(request.session, Describe, new Resource(Topic, topic))).toSet
    } else {
      metadataRequest.topics.toSet
    }

    //when topics is empty this will be a duplicate authorization check but given this should just be a cache lookup, it should not matter.
    var (authorizedTopics, unauthorizedTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(Topic, topic)))

    if (!authorizedTopics.isEmpty) {
      val topicResponses = metadataCache.getTopicMetadata(authorizedTopics, request.securityProtocol)
      if (config.autoCreateTopicsEnable && topicResponses.size != authorizedTopics.size) {
        val nonExistentTopics: Set[String] = topics -- topicResponses.map(_.topic).toSet
        authorizer.foreach {
          az => if (!az.authorize(request.session, Create, Resource.ClusterResource)) {
            authorizedTopics --= nonExistentTopics
            unauthorizedTopics ++= nonExistentTopics
          }
        }
      }
    }

    val unauthorizedTopicMetaData = unauthorizedTopics.map(topic => new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.TopicAuthorizationCode))

    val topicMetadata = if (authorizedTopics.isEmpty) Seq.empty[TopicMetadata] else getTopicMetadata(authorizedTopics, request.securityProtocol)
    val brokers = metadataCache.getAliveBrokers
    val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata  ++ unauthorizedTopicMetaData, metadataRequest.correlationId)
    requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))

看着代码不少, 实际上比较简单:

  1. 先确定需要获取哪些topic的metadata信息, 如果request里未指定topic, 则获取当前所有的topic的metadata信息;
  2. 有效性验证,将topic分为authorizedTopicsunauthorizedTopics;
  3. 获取authorizedTopics的metadata, 注意getTopicMetadata方法是关键所在, 它会先筛选出当前不存在的topic, 如果auto.create.topics.enable=true, 则调用AdminUtils.createTopic先创建此topic, 但此时其PartitionStateInfo为空, 不过也会作为Metadata Response的一部分返回给客户端.

Kafka源码分析-汇总

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 记一次Kafka集群的故障恢复Kafka源码分析-汇总

    扫帚的影子
  • ReplicaManager源码解析2-LeaderAndIsr 请求响应

    其中最主要的操作调用ReplicaManager.becomeLeaderOrFollower来初始化Partition

    扫帚的影子
  • 玩转 Kafka Raft 模式 - 入门宝典

    ? ? 导读 Apache Kafka 将会在3.0 的版本里去掉 Apache Zookeeper 的依赖独立运行。3.0的的Kafka 从架构,代码层面相...

    腾讯云中间件团队
  • 【Kafka】使用Wireshark抓包分析Kafka通信协议

    Wireshark (前身 Ethereal)是一个网络封包分析软件。网络封包分析软件的功能是撷取网络封包,并尽可能显示出最为详细的网络封包资料。

    图样图森跛
  • 如何使用Sentry为Kafka赋权

    Fayson
  • Kafka运维填坑Kafka源码分析-汇总

    调用Runtime.getRuntime.halt(1)直接暴力退出了. 可参考Kafka issue: Unclean leader election an...

    扫帚的影子
  • kafka MetaData

    Metadata只是一个信息副本,其更新由MetadataUpdater完成。MetadataUpdater由NetworkClient调用,用于更新Metad...

    平凡的学生族
  • Kafka的日志管理模块--LogManagerKafka源码分析-汇总

    a. 如果kafka进程是优雅干净地退出的,会创建一个名为.kafka_cleanshutdown的文件作为标识; b. 启动kafka时, 如果不存在该文件...

    扫帚的影子
  • 0836-Apache Druid on HDP

    Apache Druid是一个分布式的、面向列的、实时分析数据库,旨在快速获取大量数据并将其编入索引,并对大型数据集进行快速的切片和切分分析(“OLAP查询),...

    Fayson
  • Kafka学习笔记之Kafka性能测试方法及Benchmark报告

      本文主要介绍了如何利用Kafka自带的性能测试脚本及Kafka Manager测试Kafka的性能,以及如何使用Kafka Manager监控Kafka的工...

    Jetpropelledsnake21
  • Kafka中的时间轮Kafka源码分析-汇总

    将TimerTask对象绑定到 TimerTaskEntry上 如果这个TimerTask对象之前已经绑定到了一个 TimerTaskEntry上, 先调用t...

    扫帚的影子
  • 如何使用StreamSets实时采集Kafka数据并写入Hive表

    Fayson
  • 大数据全体系年终总结

      1、文件存储当然是选择Hadoop的分布式文件系统HDFS,当然因为硬件的告诉发展,已经出现了内存分布式系统Tachyon,不论是Hadoop的MapRed...

    用户3003813
  • KafkaProducer源码分析

    Broker:Kafka的服务端即Kafka实例,Kafka集群由一个或多个Broker组成,主要负责接收和处理客户端的请求

    用户6070864
  • 基于Kafka+ELK搭建海量日志平台

    早在传统的单体应用时代,查看日志大都通过SSH客户端登服务器去看,使用较多的命令就是 less 或者 tail。如果服务部署了好几台,就要分别登录到这几台机器上...

    王知无-import_bigdata
  • kubernetes 中 kafka 和 zookeeper 有状态集群服务部署实践 (一)

    本文将尝试根据社区提供的 StatefulSet 方案,对 kafka 和 zookeeper 服务进行部署。具体的部署过程包括以下几个部署: Persiste...

    腾讯云容器服务团队
  • Kafka剖析系列之Benchmark

    性能测试及集群监控工具 本章将介绍Kafka提供的性能测试工具,Metrics报告工具及Yahoo开源的Kafka Manager。 Kafka性能测试脚本 $...

    用户1263954
  • 独家分享:Kafka集群间信息复制攻略来了!

    自2006年以来,曾就职于SonyEricsson、SAP等多家公司,历任软件开发工程师,数据开发工程师,解决方案架构师

    腾讯云中间件团队
  • kafka高版本Client连接0.9Server引发的血案排查

    在一个月黑风高的夜晚,我们kafka生产端开始疯狂告警,出现大量程序队列堵塞、数据写入失败、写入性能下降的告警。

    皮皮熊

扫码关注云+社区

领取腾讯云代金券