前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka源码分析-启动流程

Kafka源码分析-启动流程

作者头像
扫帚的影子
发布2018-09-05 17:14:56
1.3K0
发布2018-09-05 17:14:56
举报
  • 我们一般都是使用bin/kafka-server-start.sh脚本来启动;
  • bin/kafka-server-start.sh可以知道此脚本用法: echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" (1) server.properties为配置文件路径, 这里config/server.properties有一个配置文件的模板,里面就是一行行的key=value; (2) --override property=value是若干个可项的参数, 用来覆盖server.properties配置文件中同名的配置项;
  • bin/kafka-server-start.sh 最后一行exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"可知, Kafka启动时的入口类为kafka.Kafka, 我们直接来看这个类;

Kafka启动入口类:kafk.Kafak

  • 所在文件: core/src/main/scala/kafka/Kafka.scala
  • 定义: object Kafka extends Logging
  • main函数:
代码语言:javascript
复制
      val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {
        override def run() = {
          kafkaServerStartable.shutdown //捕获control-c中断,停止当前服务
        }
      })

      kafkaServerStartable.startup //启动服务
      kafkaServerStartable.awaitShutdown //等待服务结束

使用getPropsFromArgs方法来获取各配置项, 然后将启动和停止动作全部代理给KafkaServerStartable类;

Kafka启动代理类:KafkaServerStartable

  • 伴生对象: object KafkaServerStartable 提供fromProps方法来创建 KafkaServerStartable;
  • KafkaServerStartable对象创建时会同时创建 KafkaServer, 这才是真正的主角;
代码语言:javascript
复制
def startup() {
    try {
      server.startup()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStartable startup. Prepare to shutdown", e)
        // KafkaServer already calls shutdown() internally, so this is purely for logging & the exit code
        System.exit(1)
    }
  }

  def shutdown() {
    try {
      server.shutdown()
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServerStable shutdown. Prepare to halt", e)
        // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
        Runtime.getRuntime.halt(1)
    }
  }

  /**
   * Allow setting broker state from the startable.
   * This is needed when a custom kafka server startable want to emit new states that it introduces.
   */
  def setServerState(newState: Byte) {
    server.brokerState.newState(newState)
  }

  def awaitShutdown() = 
    server.awaitShutdown

下一篇我们来开始介绍Kafka基础组件和辅助类库简介

Kafka源码分析-汇总
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016.12.23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Kafka启动入口类:kafk.Kafak
  • Kafka启动代理类:KafkaServerStartable
  • 下一篇我们来开始介绍Kafka基础组件和辅助类库简介
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档