前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka 启动1 入口函数

kafka 启动1 入口函数

作者头像
平凡的学生族
发布2020-06-02 22:45:56
1.3K0
发布2020-06-02 22:45:56
举报
文章被收录于专栏:后端技术后端技术

参考

jaceklaskowski gitbooks apache-kafka

1. 入口

程序从kafka.Kafka的main函数开始:

def main(args: Array[String]): Unit = {
    try {
      val serverProps = getPropsFromArgs(args)  // 读取server.properties
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)  // 创建kafkaServerStartable

      try {
        if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
          new LoggingSignalHandler().register()
      } catch {
        case e: ReflectiveOperationException =>
          warn("Failed to register optional signal handler that logs a message when the process is terminated " +
            s"by a signal. Reason for registration failure is: $e", e)
      }

      // attach shutdown handler to catch terminating signals as well as normal termination
      Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown)

      kafkaServerStartable.startup()
      kafkaServerStartable.awaitShutdown()
    }
    catch {
      case e: Throwable =>
        fatal("Exiting Kafka due to fatal exception", e)
        Exit.exit(1)
    }
    Exit.exit(0)
  }

1.1 getPropsFromArgs(args)

// Kafka.scala
def getPropsFromArgs(args: Array[String]): Properties = {
    ...
    val props = Utils.loadProps(args(0))

启动时传入了参数config/server.properties:

启动参数

我们看到Utils.loadProps的返回值是Properties类型

// Utils.java
public static Properties loadProps(String filename) throws IOException {
        return loadProps(filename, null);
    }

读取参数

该函数的目的是读取config/server.properties中的参数到Properties中,供之后使用

1.2 KafkaServerStartable.fromProps(serverProps)

object KafkaServerStartable {
  def fromProps(serverProps: Properties): KafkaServerStartable = {
    fromProps(serverProps, None)
  }

  def fromProps(serverProps: Properties, threadNamePrefix: Option[String]): KafkaServerStartable = {
    val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))  // 貌似没啥用
    new KafkaServerStartable(KafkaConfig.fromProps(serverProps, false), reporters, threadNamePrefix)
  }
}

该函数做两件事: 1. 创建reporters 2. 创建KafkaServerStartable

先调试至此:

reporters为空

看到reporters是空数组,那么略过该变量

然后看下KafkaServerStartable,它维护了一个KafkaServer, 简单定义了startupshutdown等函数,都是操纵server变量

class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {
  private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)

  def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)

  def startup(): Unit = {
    try server.startup()
    catch {
      case _: Throwable =>
        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
  }

  def shutdown(): Unit = {
    try server.shutdown()
    catch {
      case _: Throwable =>
        fatal("Halting Kafka.")
        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
        Exit.halt(1)
    }
  }

  /**
   * Allow setting broker state from the startable.
   * This is needed when a custom kafka server startable want to emit new states that it introduces.
   */
  def setServerState(newState: Byte): Unit = {
    server.brokerState.newState(newState)
  }

  def awaitShutdown(): Unit = server.awaitShutdown()

}

1.3 KafkaServerStartable启动

main函数最后两句是:

kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()

由上文可知,这两句调用都由KafkaServer完成

我们先看下kafkaServerStartable.awaitShutdown()的实现

// KafkaServerStartable.scala
def awaitShutdown(): Unit = server.awaitShutdown()
// KafkaServer.scala
private var shutdownLatch = new CountDownLatch(1)
...
def awaitShutdown(): Unit = shutdownLatch.await()

就是简单地等待CountDownLatch.

之后重点关注KafkaServer的startup方法

KafkaServer::startup

/**
   * Start up API for bringing up a single instance of the Kafka server.
   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
   */
  def startup(): Unit = {
    ...
  }

该函数很长,有150行以上. 根据注释可知该函数的作用是启动Kafka服务器实例. 它主要做三件事: 初始化LogManager、SocketServer和KafkaRequestHandlers

initZkClient

先看这句,注释和函数名说得很清楚,就是初始化zookeeper的客户端,并进行一些配置

// KafkaServer.scala
/* setup zookeeper */
initZkClient(time)

顺着调用链看下去:

private def initZkClient(time: Time): Unit = {
    info(s"Connecting to zookeeper on ${config.zkConnect}")

    def createZkClient(zkConnect: String, isSecure: Boolean) = {
      KafkaZkClient(zkConnect, isSecure, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
        config.zkMaxInFlightRequests, time, name = Some("Kafka server"), zkClientConfig = Some(zkClientConfig))
    }
    ...
    _zkClient = createZkClient(config.zkConnect, secureAclsEnabled)
}

  // KafkaZkClient.scala
  def apply(connectString: String,  ...) = {
    val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
      time, metricGroup, metricType, name, zkClientConfig)
    new KafkaZkClient(zooKeeperClient, isSecure, time)
  }

由KafkaZkClient::apply可见,代码创建了一个ZooKeeperClient,并用它创建了KafkaZkClient。再看两个类的注释可知,前者用于与zookeeper交互,后者是kafka专用的zookeeper客户端

ZooKeeperClient内部维护了一个org.apache.zookeeper.ZooKeeper实例,作为kafka与zookeeper交互的客户端

// ZooKeeperClient.scala
  private[zookeeper] val expiryScheduler = new KafkaScheduler(threads = 1, "zk-session-expiry-handler")
...
  // Fail-fast if there's an error during construction (so don't call initialize, which retries forever)
  @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher,
    clientConfig)

LogManager

KafkaServer::startup中的注释提到,启动时会初始化LogManager. 函数中下面的代码启动了LogManager:

/* start log manager */
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()

参考gitbook kafka-log-LogManager.htmlKafka LogManager详解(六) 可以获得更多该类的作用。

我们看下该类的注释。注释中说:

  • 该类是kafka日志管理系统的入口,负责日志的创建、获取、清理。所有的读、写操作都是由每一个Log实例完成的。
  • LogManager在一个或多个目录下保存日志。新的日志是在数据目录中以最少的日志创建的。"No attempt is made to move partitions after the fact or balance based on size or I/O rate."(不会翻译)
  • 有后台线程通过周期性地截断日志段来维持日志。
/**
 * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
 * All read and write operations are delegated to the individual log instances.
 *
 * The log manager maintains logs in one or more directories. New logs are created in the data directory
 * with the fewest logs. No attempt is made to move partitions after the fact or balance based on
 * size or I/O rate.
 *
 * A background thread handles log retention by periodically truncating excess log segments.
 */
@threadsafe
class LogManager(logDirs: Seq[File],
    ...

LogManager的第一个参数logDirs指示了日志存储目录,我们调试看到地址是/tmp/kafka-logs

log 地址

在目录中可以看到我之前创建的topic,demo_topic,以及关于它的日志。

log文件夹

SocketServer

KafkaServer::startup中的另一个作用就是启动SocketServer,下面的代码完成了这一步.

// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)

看下该类的源码可以知道,SocketServer的作用是: 管理与broker之间的新连接、请求、响应 Kafka支持两种请求层:

  1. 数据层,管理来自客户端和集群里其它broker的请求
  • 线程模型是:
  • 每个listener有一个Acceptor线程,处理新连接
  • 每个Acceptor有N个Processor线程,各自有自己的selector,并从socket读取请求
  • 有M个Handler线程,负责处理请求、生成响应并返回给processor线程进行写操作。
  • 可以定义多个数据层,只要在KafkaConfig里为"listeners"指定多个以","分隔的endpoints即可。
  1. 控制层, 管理来自controller的请求. 这是一个可选项,可以通过指定"control.plane.listener.name"来设置. 如果没有设置,controller的请求都会由数据层处理.
  • 线程模型是:
  • 一个Acceptor线程处理新连接
  • 每个Acceptor有1个Processor线程对应,后者有自己的selector,并从socket读取请求.
  • 1个Handler线程,负责处理请求、生成响应并返回给processor线程进行写操作。

看到Acceptor、Processor和Handler,熟悉Reactor模型的各位一定对此不陌生了,Kafka的网络模型在此一目了然

关于kafka controller

/**
 * Handles new connections, requests and responses to and from broker.
 * Kafka supports two types of request planes :
 *  - data-plane :
 *    - Handles requests from clients and other brokers in the cluster.
 *    - The threading model is
 *      1 Acceptor thread per listener, that handles new connections.
 *      It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
 *      Acceptor has N Processor threads that each have their own selector and read requests from sockets
 *      M Handler threads that handle requests and produce responses back to the processor threads for writing.
 *  - control-plane :
 *    - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name".
 *      If not configured, the controller requests are handled by the data-plane.
 *    - The threading model is
 *      1 Acceptor thread that handles new connections
 *      Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
 *      1 Handler thread that handles requests and produce responses back to the processor thread for writing.
 */
class SocketServer(val config: KafkaConfig,
                   val metrics: Metrics,
                   val time: Time,
                   val credentialProvider: CredentialProvider)
  extends Logging with KafkaMetricsGroup with BrokerReconfigurable {

我们之后再深入理解这些组件

总结

我们简单浏览了Kafka的启动流程,并认识到了LogManager和SocketServer,这两个分别与kafka的日志管理、kafka的网络模型相关。 之后我们要着重看这两处。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 参考
  • 1. 入口
    • 1.1 getPropsFromArgs(args)
      • 1.2 KafkaServerStartable.fromProps(serverProps)
        • 1.3 KafkaServerStartable启动
        • KafkaServer::startup
          • initZkClient
            • LogManager
              • SocketServer
              • 总结
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档