专栏首页左手java右手go玩转Elasticsearch源码-一张图看懂ES启动流程

玩转Elasticsearch源码-一张图看懂ES启动流程

开篇

直接看图

上图中虚线表示进入具体流程,实线表示下一步,为了后面讲解方便每个步骤都加了编号。 先简单介绍下启动流程主要涉及的类:

  • org.elasticsearch.bootstrap.Elasticsearch: 启动入口,main方法就在这个类里面,执行逻辑对应图中绿色部分
  • org.elasticsearch.bootstrap.Bootstrap:包含主要启动流程代码,执行逻辑对应图中红色部分
  • org.elasticsearch.node.Node:代表集群中的节点,执行逻辑对应图中蓝色部分

流程讲解

  1. main方法
  2. 设置了一个空的SecurityManager:

// we want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the // presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy) //我们希望JVM认为已经安装了一个安全管理器,这样,如果基于安全管理器的存在或缺少安全管理器的内部策略决策就会像有一个安全管理器一样(e.g.、DNS缓存策略) // grant all permissions so that we can later set the security manager to the one that we want //授予所有权限,以便稍后可以将安全管理器设置为所需的权限

添加StatusConsoleListener到STATUS_LOGGER:

We want to detect situations where we touch logging before the configuration is loaded . If we do this , Log 4 j will status log an error message at the error level . With this error listener , we can capture if this happens . More broadly , we can detect any error - level status log message which likely indicates that something is broken . The listener is installed immediately on startup , and then when we get around to configuring logging we check that no error - level log messages have been logged by the status logger . If they have we fail startup and any such messages can be seen on the console 我们希望检测在加载配置之前进行日志记录的情况。如果这样做,log4j将在错误级别记录一条错误消息。使用这个错误监听器,我们可以捕捉到这种情况。更广泛地说,我们可以检测任何错误级别的状态日志消息,这些消息可能表示某个东西坏了。侦听器在启动时立即安装,然后在配置日志记录时,我们检查状态日志记录器没有记录错误级别的日志消息。如果它们启动失败,我们可以在控制台上看到任何此类消息。

实例化Elasticsearch:

Elasticsearch() {        
        super("starts elasticsearch", () -> {}); // () -> {} 是启动前的回调
       //下面解析version,daemonize,pidfile,quiet参数
       versionOption = parser.acceptsAll(Arrays.asList("V", "version"),"Prints elasticsearch version information and exits");
       daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),"Starts Elasticsearch in the background")
           .availableUnless(versionOption);
       pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),"Creates a pid file in the specified path on start")
           .availableUnless(versionOption)
           .withRequiredArg()
           .withValuesConvertedBy(new PathConverter());
       quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),"Turns off standard output/error streams logging in console")
           .availableUnless(versionOption)
           .availableUnless(daemonizeOption);
}

3.注册ShutdownHook,用于关闭系统时捕获IOException到terminal

shutdownHookThread = new Thread(() -> {                
    try {                    
        this.close();
   } catch (final IOException e) {                    
        try (
           StringWriter sw = new StringWriter();
           PrintWriter pw = new PrintWriter(sw)) {
           e.printStackTrace(pw);
           terminal.println(sw.toString());
       } catch (final IOException impossible) {                        // StringWriter#close declares a checked IOException from the Closeable interface but the Javadocs for StringWriter
           // say that an exception here is impossible
           throw new AssertionError(impossible);
       }
    }
});
Runtime.getRuntime().addShutdownHook(shutdownHookThread);

然后调用beforeMain.run(),其实就是上面实例化Elasticsearch对象时创建的()->{} lambda表达式。 4.进入Command类的mainWithoutErrorHandling方法

 void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {        
        final OptionSet options = parser.parse(args);//根据提供给解析器的选项规范解析给定的命令行参数       if (options.has(helpOption)) {
           printHelp(terminal);            
            return;
       }        
        if (options.has(silentOption)) {//terminal打印最少内容
           terminal.setVerbosity(Terminal.Verbosity.SILENT);
       } else if (options.has(verboseOption)) {//terminal打印详细内容
           terminal.setVerbosity(Terminal.Verbosity.VERBOSE);
       } else {
           terminal.setVerbosity(Terminal.Verbosity.NORMAL);
       }       execute(terminal, options);
   }

5.进入EnvironmentAwareCommand的execute方法

protected void execute(Terminal terminal, OptionSet options) throws Exception {        
    final Map<String, String> settings = new HashMap<>();        
    for (final KeyValuePair kvp : settingOption.values(options)) {            
        if (kvp.value.isEmpty()) {                
            throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty");
       }            
        if (settings.containsKey(kvp.key)) {                
            final String message = String.format(
                       Locale.ROOT,
                        "setting [%s] already set, saw [%s] and [%s]",
                       kvp.key,
                       settings.get(kvp.key),
                       kvp.value);                
            throw new UserException(ExitCodes.USAGE, message);
       }
       settings.put(kvp.key, kvp.value);
    }        
     //确保给定的设置存在,如果尚未设置,则从系统属性中读取它。
    putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
     putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
    putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");    execute(terminal, options, createEnv(terminal, settings));
}

6.进入InternalSettingsPreparer的prepareEnvironment方法,读取elasticsearch.yml并创建Environment。细节比较多,后面再细讲。

Environment对象.png

7.判断是否有-v参数,没有则准备进入init流程

 protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {        
     if (options.nonOptionArguments().isEmpty() == false) {            
         throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments());
    }        
     if (options.has(versionOption)) { 
         //如果有 -v 参数,打印版本号后直接退出
        terminal.println("Version: " + Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot())
                   + ", Build: " + Build.CURRENT.shortHash() + "/" + Build.CURRENT.date()
                   + ", JVM: " + JvmInfo.jvmInfo().version());            
         return;
    }        
     final boolean daemonize = options.has(daemonizeOption);        
     final Path pidFile = pidfileOption.value(options);        
     final boolean quiet = options.has(quietOption);        
     try {
           init(daemonize, pidFile, quiet, env);
    } catch (NodeValidationException e) {            
         throw new UserException(ExitCodes.CONFIG, e.getMessage());
    }
}

8.调用Bootstrap.init 9.实例化Boostrap。保持keepAliveThread存活,可能是用于监控

Bootstrap() {
       keepAliveThread = new Thread(new Runnable() {            
            @Override
           public void run() {                
                try {
                   keepAliveLatch.await();
               } catch (InterruptedException e) {                    
                    // bail out
               }
           }
       }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
       keepAliveThread.setDaemon(false);        
        // keep this thread alive (non daemon thread) until we shutdown 
        //保持这个线程存活(非守护进程线程),直到我们关机
       Runtime.getRuntime().addShutdownHook(new Thread() {            
            @Override
           public void run() {
               keepAliveLatch.countDown();
           }
       });
   }

10.加载elasticsearch.keystore文件,重新创建Environment,然后调用LogConfigurator的静态方法configure,读取config目录下log4j2.properties然后配log4j属性 11.创建pid文件,检查lucene版本,不对应则抛出异常

 private static void checkLucene() {        
     if (Version.CURRENT.luceneVersion.equals(org.apache.lucene.util.Version.LATEST) == false) {            
         throw new AssertionError("Lucene version mismatch this version of Elasticsearch requires lucene version ["
               + Version.CURRENT.luceneVersion + "]  but the current lucene version is [" + org.apache.lucene.util.Version.LATEST + "]");
    }
}

12.设置ElasticsearchUncaughtExceptionHandler用于打印fatal日志

// install the default uncaught exception handler; must be done before security is// initialized as we do not want to grant the runtime permission// 安装默认未捕获异常处理程序;必须在初始化security之前完成,因为我们不想授予运行时权限// setDefaultUncaughtExceptionHandlerThread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));

13.进入Boostrap.setup 14.spawner.spawnNativePluginControllers(environment);尝试为给定模块生成控制器(native Controller)守护程序。 生成的进程将通过其stdin,stdout和stderr流保持与此JVM的连接,但对此包之外的代码不能使用对这些流的引用。 15.初始化本地资源 initializeNatives():检查用户是否作为根用户运行,是的话抛异常;系统调用和mlockAll检查;尝试设置最大线程数,最大虚拟内存,最大FD等。 初始化探针initializeProbes(),用于操作系统,进程,jvm的监控。 16.又加一个ShutdownHook

if (addShutdownHook) {
   Runtime.getRuntime().addShutdownHook(new Thread() {                
        @Override        public void run() {                    
            try {
              IOUtils.close(node, spawner);
              LoggerContext context = (LoggerContext) LogManager.getContext(false);
              Configurator.shutdown(context);
           } catch (IOException ex) {                        
                throw new ElasticsearchException("failed to stop node", ex);
           }
       }
   });
}

17.比较简单,直接看代码

 try {            
     // look for jar hell
    JarHell.checkJarHell();
} catch (IOException | URISyntaxException e) {            
     throw new BootstrapException(e);
}        
 // Log ifconfig output before SecurityManager is installed
IfConfig.logIfNecessary();        
 // install SM after natives, shutdown hooks, etc.
try {
    Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
} catch (IOException | NoSuchAlgorithmException e) {            
     throw new BootstrapException(e);
 }

18.实例化Node,重写validateNodeBeforeAcceptingRequests方法。具体主要包括三部分,第一是启动插件服务(es提供了插件功能来进行扩展功能,这也是它的一个亮点),加载需要的插件,第二是配置node环境,最后就是通过guice加载各个模块。下面22~32就是具体步骤。 19.进入Boostrap.start 20.node.start启动节点 21.keepAliveThread.start 22.Node实例化第一步,创建NodeEnvironment

23.生成nodeId,打印nodeId,nodeName和jvmInfo和进程信息 24.创建 PluginsService 对象,创建过程中会读取并加载所有的模块和插件

25.又创建Environment

// create the environment based on the finalized (processed) view of the settings 根据设置的最终(处理)视图创建环境// this is just to makes sure that people get the same settings, no matter where they ask them from 这只是为了确保人们得到相同的设置,无论他们从哪里询问this.environment = new Environment(this.settings, environment.configFile());

26.创建ThreadPool,然后给DeprecationLogger设置ThreadContext 27.创建NodeClient,用于执行actions 28.创建各个Service:ResourceWatcherService、NetworkService、ClusterService、IngestService、ClusterInfoService、UsageService、MonitorService、CircuitBreakerService、MetaStateService、IndicesService、MetaDataIndexUpgradeService、TemplateUpgradeService、TransportService、ResponseCollectorService、SearchTransportService、NodeService、SearchService、PersistentTasksClusterService 29.创建并添加modules:ScriptModule、AnalysisModule、SettingsModule、pluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule 30.Guice绑定和注入对象 31.初始化NodeClient

  client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}),
                   () -> clusterService.localNode().getId());

32.初始化rest处理器,这个非常重要,后面会专门讲解

if (NetworkModule.HTTP_ENABLED.get(settings)) {
        logger.debug("initializing HTTP handlers ..."); 
         // 初始化http handler
         actionModule.initRestHandlers(() -> clusterService.state().nodes());
}

33.修改状态为State.STARTED 34.启动pluginLifecycleComponents 35.通过 injector 获取各个类的对象,调用 start() 方法启动(实际进入各个类的中 doStart 方法): LifecycleComponent、IndicesService、IndicesClusterStateService、SnapshotsService、SnapshotShardsService、RoutingService、SearchService、MonitorService、NodeConnectionsService、ResourceWatcherService、GatewayService、Discovery、TransportService 36.启动HttpServerTransport和TransportService并绑定端口

if (WRITE_PORTS_FILE_SETTING.get(settings)) {            
       if (NetworkModule.HTTP_ENABLED.get(settings)) {
               HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
               writePortsFile("http", http.boundAddress());
       }
       TransportService transport = injector.getInstance(TransportService.class);
       writePortsFile("transport", transport.boundAddress());
}

总结

  • 本文只是讲解了ES启动的整体流程,其中很多细节会在本系列继续深入讲解
  • ES的源码读起来还是比较费劲的,流程比较长,经常读到后面忘了前面,需要通过画图的方式帮助梳理整个过程。
  • 为什么要读开源源码? 1.知道底层实现,能够更好地使用,出问题能够快速定位和解决。 2.学习别人优秀的代码和处理问题的方式,提高自己的系统设计能力。 3.有机会可以对其进行扩展和改造。

本文分享自微信公众号 - 左手java右手go(gh_e7502321bf3d),作者:邱旭东

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-02-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 玩转Elasticsearch源码-ActionModule启动分析

    org.elasticsearch.action.ActionModule主要维护了请求和响应相关组件,它们可能来自ES本身或者来自plugin。

    左手java右手go
  • 修改golang源代码获取goroutine id实现ThreadLocal

    golang在http.Request中提供了一个Context用于存储kv对,我们可以通过这个来存储请求相关的数据。在请求入口,我们把唯一的requstID存...

    左手java右手go
  • Spring扩展点总结(持续更新)

    左手java右手go
  • 重学Java之核心类

    最早的字符编码是ASCII码,一个字符占一个字节,最多表示128个字符。字符'A'的编码是0x41

    慕白
  • Spring Cloud 注册中心 Eureka 高可用机制

    Eureka Client 启动的时候发现 Eureka Server 都用不了,导致:

    dys
  • 高性能无锁队列 Disruptor 初体验

    最近一直在研究队列的一些问题,今天楼主要分享一个高性能的队列 Disruptor 。

    haifeiWu
  • SwiftUI之List Group NavigationView ForEach

    public struct List<Selection, Content> where Selection : SelectionManager, Conte...

    大话swift
  • “小米们”的逃离

    在中国互联网圈,有一个广为流传的段子:如果哪天全国的互联网都瘫痪了,那一定是后厂村大堵车了。恰巧,不久前镁客网受邀去北京参加新品发布会,地点就在后厂村。

    镁客网
  • 经验 | 数据竞赛Tricks集锦

    本文将对数据竞赛的『技巧』进行全面的总结,同时还会分享下个人对比赛方法论的思考。前者比较客观,总结了不同数据类型下涉及到的比赛技巧;后者稍微主观,是我个人对解决...

    yuquanle
  • Leetcode solution 680: Valid Palindrome II

    https://blog.baozitraining.org/2019/03/leetcode-solution-680-valid-palindrome.ht...

    包子面试培训

扫码关注云+社区

领取腾讯云代金券