Kafka源码分析-启动流程

  • 我们一般都是使用bin/kafka-server-start.sh脚本来启动;
  • bin/kafka-server-start.sh可以知道此脚本用法: echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" (1) server.properties为配置文件路径, 这里config/server.properties有一个配置文件的模板,里面就是一行行的key=value; (2) --override property=value是若干个可项的参数, 用来覆盖server.properties配置文件中同名的配置项;
  • bin/kafka-server-start.sh 最后一行exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"可知, Kafka启动时的入口类为kafka.Kafka, 我们直接来看这个类;

Kafka启动入口类:kafk.Kafak

  • 所在文件: core/src/main/scala/kafka/Kafka.scala
  • 定义: object Kafka extends Logging
  • main函数:
      val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {
        override def run() = {
          kafkaServerStartable.shutdown //捕获control-c中断,停止当前服务
        }
      })

      kafkaServerStartable.startup //启动服务
      kafkaServerStartable.awaitShutdown //等待服务结束

使用getPropsFromArgs方法来获取各配置项, 然后将启动和停止动作全部代理给KafkaServerStartable类;

Kafka启动代理类:KafkaServerStartable

  • 伴生对象: object KafkaServerStartable 提供fromProps方法来创建 KafkaServerStartable;
  • KafkaServerStartable对象创建时会同时创建 KafkaServer, 这才是真正的主角;
def startup() {
    try {
      server.startup()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
        // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
        System.exit(1)
    }
  }

  def shutdown() {
    try {
      server.shutdown()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
        Runtime.getRuntime.halt(1)
    }
  }

  /**
   * Allow setting broker state from the startable.
   * This is needed when a custom kafka server startable want to emit new states that it introduces.
   */
  def setServerState(newState: Byte) {
    server.brokerState.newState(newState)
  }

  def awaitShutdown() = 
    server.awaitShutdown

下一篇我们来开始介绍Kafka基础组件和辅助类库简介

Kafka源码分析-汇总

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java达人

Spring声明式事务的一个注意点及原理简析

以前我们说过,Spring通过ThreadLocal机制解除了事务管理模块与数据访问层的紧密耦合,提高了模块的可重用性,也保证了多线程环境下的对connecti...

2236
来自专栏Golang语言社区

Golang测试技术

本篇文章内容来源于Golang核心开发组成员Andrew Gerrand在Google I/O 2014的一次主题分享“Testing Techniques”,...

4116
来自专栏Golang语言社区

Golang测试技术

本篇文章内容来源于Golang核心开发组成员Andrew Gerrand在Google I/O 2014的一次主题分享“Testing Techniques”,...

3987
来自专栏一个爱瞎折腾的程序猿

asp.net core使用Swashbuckle.AspNetCore(swagger)生成接口文档

开局一张图,然后开始编,一些基本的asp.net core东西就不再赘述,本文只对Swashbuckle.AspNetCore的几个使用要点进行描述。

1771
来自专栏Rainbond开源「容器云平台」

Docker源码分析之容器日志处理与log-driver实现

1623
来自专栏Linyb极客之路

log4j配置学习总结

Log4j有三个主要的组件:Loggers(记录器),Appenders (输出源)和Layouts(布局)。这里可简单理解为日志类别,日志要输出的地方和日志以...

1293
来自专栏猿人谷

使用bash编写Linux shell脚本--复合命令

除了最简单的脚本,你很少想要执行每一个命令。执行一组命令或者重复执行一组命令若干次比执行单个命令更加有助。复合命令是将命令封装在一组其他命令中。 从可读性来说,...

32910
来自专栏更流畅、简洁的软件开发方式

【开源】QuickPager ASP.NET2.0分页控件 v2.0.0.2版本。

下载地址:http://files.cnblogs.com/jyk/Page2.0.0.2_080701.rar 这回只有 dll文件。请把包里的文件拷贝到...

2106
来自专栏冷冷

【系统日志】log4j配置学习总结

Log4j有三个主要的组件:Loggers(记录器),Appenders (输出源)和Layouts(布局)。这里可简单理解为日志类别,日志要输出的地方和日志以...

1936
来自专栏偏前端工程师的驿站

.Net魔法堂:log4net详解

一、作用                              提供一个记录日志的框架,可以将日志信息记录到文件、控制台、Windows事件日志和数据库(M...

2135

扫码关注云+社区

领取腾讯云代金券