专栏首页后端技术kafka 启动1 入口函数

kafka 启动1 入口函数

参考

jaceklaskowski gitbooks apache-kafka

1. 入口

程序从kafka.Kafka的main函数开始:

def main(args: Array[String]): Unit = {
    try {
      val serverProps = getPropsFromArgs(args)  // 读取server.properties
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)  // 创建kafkaServerStartable

      try {
        if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
          new LoggingSignalHandler().register()
      } catch {
        case e: ReflectiveOperationException =>
          warn("Failed to register optional signal handler that logs a message when the process is terminated " +
            s"by a signal. Reason for registration failure is: $e", e)
      }

      // attach shutdown handler to catch terminating signals as well as normal termination
      Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown)

      kafkaServerStartable.startup()
      kafkaServerStartable.awaitShutdown()
    }
    catch {
      case e: Throwable =>
        fatal("Exiting Kafka due to fatal exception", e)
        Exit.exit(1)
    }
    Exit.exit(0)
  }

1.1 getPropsFromArgs(args)

// Kafka.scala
def getPropsFromArgs(args: Array[String]): Properties = {
    ...
    val props = Utils.loadProps(args(0))

启动时传入了参数config/server.properties:

启动参数

我们看到Utils.loadProps的返回值是Properties类型

// Utils.java
public static Properties loadProps(String filename) throws IOException {
        return loadProps(filename, null);
    }

读取参数

该函数的目的是读取config/server.properties中的参数到Properties中,供之后使用

1.2 KafkaServerStartable.fromProps(serverProps)

object KafkaServerStartable {
  def fromProps(serverProps: Properties): KafkaServerStartable = {
    fromProps(serverProps, None)
  }

  def fromProps(serverProps: Properties, threadNamePrefix: Option[String]): KafkaServerStartable = {
    val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))  // 貌似没啥用
    new KafkaServerStartable(KafkaConfig.fromProps(serverProps, false), reporters, threadNamePrefix)
  }
}

该函数做两件事: 1. 创建reporters 2. 创建KafkaServerStartable

先调试至此:

reporters为空

看到reporters是空数组,那么略过该变量

然后看下KafkaServerStartable,它维护了一个KafkaServer, 简单定义了startupshutdown等函数,都是操纵server变量

class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging {
  private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix)

  def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)

  def startup(): Unit = {
    try server.startup()
    catch {
      case _: Throwable =>
        // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
        fatal("Exiting Kafka.")
        Exit.exit(1)
    }
  }

  def shutdown(): Unit = {
    try server.shutdown()
    catch {
      case _: Throwable =>
        fatal("Halting Kafka.")
        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
        Exit.halt(1)
    }
  }

  /**
   * Allow setting broker state from the startable.
   * This is needed when a custom kafka server startable want to emit new states that it introduces.
   */
  def setServerState(newState: Byte): Unit = {
    server.brokerState.newState(newState)
  }

  def awaitShutdown(): Unit = server.awaitShutdown()

}

1.3 KafkaServerStartable启动

main函数最后两句是:

kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()

由上文可知,这两句调用都由KafkaServer完成

我们先看下kafkaServerStartable.awaitShutdown()的实现

// KafkaServerStartable.scala
def awaitShutdown(): Unit = server.awaitShutdown()
// KafkaServer.scala
private var shutdownLatch = new CountDownLatch(1)
...
def awaitShutdown(): Unit = shutdownLatch.await()

就是简单地等待CountDownLatch.

之后重点关注KafkaServer的startup方法

KafkaServer::startup

/**
   * Start up API for bringing up a single instance of the Kafka server.
   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
   */
  def startup(): Unit = {
    ...
  }

该函数很长,有150行以上. 根据注释可知该函数的作用是启动Kafka服务器实例. 它主要做三件事: 初始化LogManager、SocketServer和KafkaRequestHandlers

initZkClient

先看这句,注释和函数名说得很清楚,就是初始化zookeeper的客户端,并进行一些配置

// KafkaServer.scala
/* setup zookeeper */
initZkClient(time)

顺着调用链看下去:

private def initZkClient(time: Time): Unit = {
    info(s"Connecting to zookeeper on ${config.zkConnect}")

    def createZkClient(zkConnect: String, isSecure: Boolean) = {
      KafkaZkClient(zkConnect, isSecure, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
        config.zkMaxInFlightRequests, time, name = Some("Kafka server"), zkClientConfig = Some(zkClientConfig))
    }
    ...
    _zkClient = createZkClient(config.zkConnect, secureAclsEnabled)
}

  // KafkaZkClient.scala
  def apply(connectString: String,  ...) = {
    val zooKeeperClient = new ZooKeeperClient(connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests,
      time, metricGroup, metricType, name, zkClientConfig)
    new KafkaZkClient(zooKeeperClient, isSecure, time)
  }

由KafkaZkClient::apply可见,代码创建了一个ZooKeeperClient,并用它创建了KafkaZkClient。再看两个类的注释可知,前者用于与zookeeper交互,后者是kafka专用的zookeeper客户端

ZooKeeperClient内部维护了一个org.apache.zookeeper.ZooKeeper实例,作为kafka与zookeeper交互的客户端

// ZooKeeperClient.scala
  private[zookeeper] val expiryScheduler = new KafkaScheduler(threads = 1, "zk-session-expiry-handler")
...
  // Fail-fast if there's an error during construction (so don't call initialize, which retries forever)
  @volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher,
    clientConfig)

LogManager

KafkaServer::startup中的注释提到,启动时会初始化LogManager. 函数中下面的代码启动了LogManager:

/* start log manager */
logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
logManager.startup()

参考gitbook kafka-log-LogManager.htmlKafka LogManager详解(六) 可以获得更多该类的作用。

我们看下该类的注释。注释中说:

  • 该类是kafka日志管理系统的入口,负责日志的创建、获取、清理。所有的读、写操作都是由每一个Log实例完成的。
  • LogManager在一个或多个目录下保存日志。新的日志是在数据目录中以最少的日志创建的。"No attempt is made to move partitions after the fact or balance based on size or I/O rate."(不会翻译)
  • 有后台线程通过周期性地截断日志段来维持日志。
/**
 * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
 * All read and write operations are delegated to the individual log instances.
 *
 * The log manager maintains logs in one or more directories. New logs are created in the data directory
 * with the fewest logs. No attempt is made to move partitions after the fact or balance based on
 * size or I/O rate.
 *
 * A background thread handles log retention by periodically truncating excess log segments.
 */
@threadsafe
class LogManager(logDirs: Seq[File],
    ...

LogManager的第一个参数logDirs指示了日志存储目录,我们调试看到地址是/tmp/kafka-logs

log 地址

在目录中可以看到我之前创建的topic,demo_topic,以及关于它的日志。

log文件夹

SocketServer

KafkaServer::startup中的另一个作用就是启动SocketServer,下面的代码完成了这一步.

// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)

看下该类的源码可以知道,SocketServer的作用是: 管理与broker之间的新连接、请求、响应 Kafka支持两种请求层:

  1. 数据层,管理来自客户端和集群里其它broker的请求
  • 线程模型是:
  • 每个listener有一个Acceptor线程,处理新连接
  • 每个Acceptor有N个Processor线程,各自有自己的selector,并从socket读取请求
  • 有M个Handler线程,负责处理请求、生成响应并返回给processor线程进行写操作。
  • 可以定义多个数据层,只要在KafkaConfig里为"listeners"指定多个以","分隔的endpoints即可。
  1. 控制层, 管理来自controller的请求. 这是一个可选项,可以通过指定"control.plane.listener.name"来设置. 如果没有设置,controller的请求都会由数据层处理.
  • 线程模型是:
  • 一个Acceptor线程处理新连接
  • 每个Acceptor有1个Processor线程对应,后者有自己的selector,并从socket读取请求.
  • 1个Handler线程,负责处理请求、生成响应并返回给processor线程进行写操作。

看到Acceptor、Processor和Handler,熟悉Reactor模型的各位一定对此不陌生了,Kafka的网络模型在此一目了然

关于kafka controller

/**
 * Handles new connections, requests and responses to and from broker.
 * Kafka supports two types of request planes :
 *  - data-plane :
 *    - Handles requests from clients and other brokers in the cluster.
 *    - The threading model is
 *      1 Acceptor thread per listener, that handles new connections.
 *      It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
 *      Acceptor has N Processor threads that each have their own selector and read requests from sockets
 *      M Handler threads that handle requests and produce responses back to the processor threads for writing.
 *  - control-plane :
 *    - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name".
 *      If not configured, the controller requests are handled by the data-plane.
 *    - The threading model is
 *      1 Acceptor thread that handles new connections
 *      Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
 *      1 Handler thread that handles requests and produce responses back to the processor thread for writing.
 */
class SocketServer(val config: KafkaConfig,
                   val metrics: Metrics,
                   val time: Time,
                   val credentialProvider: CredentialProvider)
  extends Logging with KafkaMetricsGroup with BrokerReconfigurable {

我们之后再深入理解这些组件

总结

我们简单浏览了Kafka的启动流程,并认识到了LogManager和SocketServer,这两个分别与kafka的日志管理、kafka的网络模型相关。 之后我们要着重看这两处。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • kafka 源码 debug环境搭建

    添加启动项。Edit Configurations->点击+号->Application添加一个启动项,然后填写如下内容

    平凡的学生族
  • 张龙netty学习笔记 P21-P23

    平凡的学生族
  • springboot 单元测试

    请先阅读SpringBoot 使用MockMvc进行Controller的测试,这篇文章已经讲的很清楚了。概括而言,做法如下:

    平凡的学生族
  • 形式化样式传输的并行数据增强(CS CL)

    在形式样式转换任务中取得进展的主要障碍是训练数据不足。在本文中,我们研究如何扩充并行数据,并为此任务提出新颖而简单的数据扩充方法,以通过易于访问的模型和系统获得...

    刘子蔚
  • (三)服务器端的程序架构介绍1

    通过上一节的编译与部署,我们会得到TeamTalk服务器端以下部署程序: db_proxy_server file_server http_msg_server...

    范蠡
  • 第十九天- 约束 异常处理 日志使用

    py3study
  • nginx配置https(亲测可用)

    配置https前需要先创建证书,这里使用自签名ca证书: 1、创建ca自签名证书,使用sha256 算法签名,rsa2048位公钥算法。 openssl req...

    java达人
  • 利用majicAjax实现webParts无刷新拖动

    webParts与UpdatePanel都是ms的产品,直接将webparts放在updatePanel里,原以为就可以搞定webparts无刷新拖动了,试验了...

    菩提树下的杨过
  • DevOps工具介绍连载(6)——Capistrano

    最近在折腾这个,弄了好多次都不成功,看了官方文档和很多博客,都没有说清楚,因此,我觉得有必要把它记录下来,以帮助更多像我这样被弄得烦躁的人。

    小老鼠
  • 使用MagicAjax 实现无刷新Webparts

    介绍MagicAjax Web.config的配置如下: <?xml version="1.0"?> <configuration> <configSecti...

    张善友

扫码关注云+社区

领取腾讯云代金券