专栏首页Elasticsearch实验室Elasitcsearch底层系列之 Node启动过程源码解析
原创

Elasitcsearch底层系列之 Node启动过程源码解析

Elasticsearch Node 启动过程源码解析

Elasticsearch 简介

Elasticsearch 是一款开源的分布式搜索引擎,提供了近实时的查询能力和强大的聚合分析能力。与Elastic官方提供的其他组件(Beats、Logstash、Kibana)组合成Elastic Stack,提供了多种使用场景下数据摄入、清洗、存储、查询、可视化的完整解决方案,在搜索、日志分析、统计分析等领域有广泛应用。

Elasticsearch由多个节点组成一个分布式集群,一个节点被称为一个Node。本文将基于 Elasticsearch v6.4.3版本着重介绍Node的启动过程,也会简要概述ES内部的主要模块、线程池等。

Elasticsearch 启动过程

Elasticsearch的启动流程主要涉及Elasticsearch、Bootstrap和Node三个类。主要包括加载三个步骤:

  • 加载本地环境:读取命令行参数和配置文件,生成本地环境配置
  • 创建Node:创建节点实例,创建各种服务类对象,注入各种功能模块
  • 启动Node:启动各种服务,加入集群

在详细解读这三个步骤前,这里先介绍下Elasticsearch的主程序入口。

主程序入口

从elasticsearch的启动脚本(bin/elastisearch)中,可以看到主程序的入口是 org.elasticsearch.bootstrap.Elasticsearch。

启动脚本

  exec \
    "$JAVA" \
    $ES_JAVA_OPTS \ 
    -Des.path.home="$ES_HOME" \
    -Des.path.conf="$ES_PATH_CONF" \
    -Des.distribution.flavor="$ES_DISTRIBUTION_FLAVOR" \
    -Des.distribution.type="$ES_DISTRIBUTION_TYPE" \
    -cp "$ES_CLASSPATH" \
    org.elasticsearch.bootstrap.Elasticsearch \ 
    "$@"

主程序入口

PATH

org.elasticsearch.bootstrap.Elasticsearch#main

CODE

    /**
     * Main entry point for starting elasticsearch
     */
    public static void main(final String[] args) throws Exception {
        // 1. 创建安全管理器,授权所有操作
        System.setSecurityManager(new SecurityManager() {
            @Override
            public void checkPermission(Permission perm) {
                // grant all permissions so that we can later set the security manager to the one that we want
            }
        });
        // 2. 注册log侦听器
        LogConfigurator.registerErrorListener();
        // 3. 创建Elasticsearch类对象
        final Elasticsearch elasticsearch = new Elasticsearch();
        int status = main(args, elasticsearch, Terminal.DEFAULT);
        if (status != ExitCodes.OK) {
            exit(status);
        }
    }
    
    static int main(final String[] args, final Elasticsearch elasticsearch, final Terminal terminal) throws Exception {
        return elasticsearch.main(args, terminal);
    }

解析

入参 args 为命令行参数,该函数执行以下三个步骤:

  1. 设置安全管理器,授权所有操作:SecurityManager在Java中被用来检查应用程序是否能访问一些有限的资源,例如文件、套接字(socket)等。这里的checkPermission函数授权了所有操作。
  2. 注册log侦听器:这里尽早启用日志侦听,防止有些日志无法被记录。
  3. 创建Elasticsearch类对象,如下图所示,Elasticsearch的顺序继承至EnvironmentAwareCommand,Command。Elasticsearch()会调用父类构造函数,注册命令行的解析规则,后续解析命令行参数时使用。
  1. 调用elasticsearch.main()来做进一步的初始化操作(实际是Command#main)。如果初始化报错,则退出进程。

第一步:加载本地环境:Elasticserach初始化

PATH

elasticsearch\libs\cli\src\main\java\org\elasticsearch\cli\Command.java

CODE

   /** Parses options for this command from args and executes it. */
    public final int main(String[] args, Terminal terminal) throws Exception {
        if (addShutdownHook()) {

            shutdownHookThread = new Thread(() -> {
                try {
                    // Elasticsearch#close
                    this.close();
                } catch (final IOException e) {
                    try (
                        StringWriter sw = new StringWriter();
                        PrintWriter pw = new PrintWriter(sw)) {
                        // 异常关闭打印堆栈信息
                        e.printStackTrace(pw);
                        terminal.println(sw.toString());
                    } 
                }
            });
            // 1. 增加shutdown时的hook线程,在进程退出时调用
            Runtime.getRuntime().addShutdownHook(shutdownHookThread);
        }

        try {
            // 2. 解析命令行参数
            mainWithoutErrorHandling(args, terminal);
        } catch (OptionException e) {
            printHelp(terminal);
            terminal.println(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
            return ExitCodes.USAGE;
        } 
        return ExitCodes.OK;
    }

解析

Command#main的主要步骤两个:

  1. addShutdownHook:向runtime增加进程退出时的回调线程,在进程退出时调用Elasticsearch#close,如果异常关闭则打印堆栈信息
  2. mainWithoutErrorHandling:解析部分命令行参数(-h,-v,-s)后,调用EnvironmentAwareCommand#execute:
    protected void execute(Terminal terminal, OptionSet options) throws Exception {
        final Map<String, String> settings = new HashMap<>();
        putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
        putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
        putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");
        execute(terminal, options, createEnv(terminal, settings));
    }
  • 获取vm options指定的参数,放入settings中,如下图所示。
  • 调用createEnv,通过prepareEnvironment读取es的配置文件(conf/elasticsearch.yml),生成Environment,存储一些路径及ES配置信息。
    public static Environment prepareEnvironment(Settings input, Terminal terminal, Map<String, String> properties, Path configPath) {
        Settings.Builder output = Settings.builder();
        Path path = environment.configFile().resolve("elasticsearch.yml");
        if (Files.exists(path)) {
            try {
                output.loadFromPath(path);
            } catch (IOException e) {
                throw new SettingsException("Failed to load settings from " + path.toString(), e);
            }
        }
        return new Environment(output.build(), configPath);
    }
  • 调用Elasticsearch#execute,读取daemonize/pidFile/quiet值,而后调用Elasticsearch#init -> Bootstrap.init,初始化Bootstrap。
    protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
        final boolean daemonize = options.has(daemonizeOption);
        final Path pidFile = pidfileOption.value(options);
        final boolean quiet = options.has(quietOption);
        try {
            init(daemonize, pidFile, quiet, env);
        } catch (NodeValidationException e) {
            throw new UserException(ExitCodes.CONFIG, e.getMessage());
        }
    }

Bootstrap初始化

PATH

elasticsearch\bootstrap\Bootstrap.java

CODE

    /**
     * This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.
     */
    static void init(
            final boolean foreground,
            final Path pidFile,
            final boolean quiet,
            final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
            
        // 1. 创建 Bootstrap 类对象, 启动keepAlive线程
        INSTANCE = new Bootstrap();
        
        // 2. 加载安全、日志配置信息,创建pidFile
        final SecureSettings keystore = loadSecureSettings(initialEnv);
        try {
            LogConfigurator.configure(environment);
        } catch (IOException e) {
            throw new BootstrapException(e);
        }
        if (environment.pidFile() != null) {
            try {
                PidFile.create(environment.pidFile(), true);
            } catch (IOException e) {
                throw new BootstrapException(e);
            }
        }
        try {
            // 检测Lucene Jar版本
            checkLucene();
            // 3. 创建Node
            INSTANCE.setup(true, environment);
            // 4. 启动Node
            INSTANCE.start();

        } catch (NodeValidationException | RuntimeException e) {

        }
    }

解析

Bootstrap#init 顺序执行以下步骤:

  1. 创建 Bootstrap 类对象,创建keepAliveThread,等待keepAliveLatch降为0时,该线程退出。同时向runtime添加一个ShutdownHook,当进程退出时,keepAliveLatch降为0,keepAliveThread退出。The Java Virtual Machine exits when the only threads running are all daemon threads. 当唯一的非Deamon线程,keepAliveThread退出时,JVM关闭。
    /** creates a new instance */
    Bootstrap() {
        keepAliveThread = new Thread(new Runnable() {
            public void run() {
                try {
                    // 等待进程退出,等待keepAliveLatch降为0时,退出当前线程。
                    keepAliveLatch.await();
                } 
            }
        }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
        keepAliveThread.setDaemon(false);
        // keep this thread alive (non daemon thread) until we shutdown
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                // 进程退出时,keepAliveLatch降为0,keepAliveThread退出。
                keepAliveLatch.countDown();
            }
        });
    }
  1. 设定安全、日志配置信息,创建pidFile。pidFile为es的进程ID,防止多个ES进程读写同一路径。
  2. 创建Node:ES的一个节点被封装为一个Node实例,由Node调用ES的各个模块,完成集群管理、写入、查询等功能。
    private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {

        try {
            // 遍历modules目录,读取各模块信息,为其生成控制类,这些控制类将通过stdin, stdout 和 stderr 与JVM保持连接
            spawner.spawnNativeControllers(environment);
        } catch (IOException e) {
            throw new BootstrapException(e);
        }
        // 本地环境的检测、设置(user/thread/VirtualMemory/fileSize等)
        initializeNatives(
                environment.tmpFile(),
                BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
                BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
                BootstrapSettings.CTRLHANDLER_SETTING.get(settings));
        
        // 创建Node节点,后节详述
        node = new Node(environment) {
            @Override
            protected void validateNodeBeforeAcceptingRequests(
                final BootstrapContext context,
                final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
                BootstrapChecks.check(context, boundTransportAddress, checks);
            }
        };
    }
  1. 启动Node
    private void start() throws NodeValidationException {
        // 启动Node,后节详述
        node.start();
        // 启动前台keepAliveThread线程,等待进程关闭时,关闭JVM
        keepAliveThread.start();
    }

第二步:创建Node

PATH

elasticsearch\node\Node.java

CODE

代码较长,做了大量精简:

    protected Node(final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {

        try {
            // 1. 创建节点环境,包括nodeId/nodePaths/logger等;创建tmpSettings,主要是一些节点配置信息
            // create the node environment as soon as possible, to recover the node id and enable logging
            try {
                nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
            } catch (IOException ex) {
                throw new IllegalStateException("Failed to create node environment", ex);
            }
            final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
            final String nodeId = nodeEnvironment.nodeId();
            tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
            final Logger logger = Loggers.getLogger(Node.class, tmpSettings);
            // this must be captured after the node name is possibly added to the settings
            final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
            if (hadPredefinedNodeName == false) {
                logger.info("node name derived from node ID [{}]; set [{}] to override", nodeId, NODE_NAME_SETTING.getKey());
            } else {
                logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
            }
            // 2. 打印 jvm 信息
            final JvmInfo jvmInfo = JvmInfo.jvmInfo();
            logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));

            // 3. 创建PluginsService,加载modules目录下的所有模块和plugins目录下的所有插件
            this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);
            
            // 4. 创建Node.environment
             this.environment = new Environment(this.settings, environment.configFile());
             
            // 5. 调用各插件的getExecutorBuilders,获取ExecutorBuilder/thread pool
            final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
            
            // 6. 创建线程池
            final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
            
            // 7. 创建NodeClient
            client = new NodeClient(settings, threadPool);
            
            // 8. 创建各种服务类对象***Service和各种模块对象***Module
            final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
            final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
            AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
            ...
            final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);

            // 绑定各种服务模块的实例
            modules.add(b -> {
                    b.bind(Node.class).toInstance(this);
                    b.bind(NodeService.class).toInstance(nodeService);
                    ...
                    b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
                    b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
                }
            );
            injector = modules.createInjector();

            // 9. 初始化rest handler,用于后续接收 http rest 请求
            if (NetworkModule.HTTP_ENABLED.get(settings)) {
                logger.debug("initializing HTTP handlers ...");
                actionModule.initRestHandlers(() -> clusterService.state().nodes());
            }
            
            // node初始化完成
            logger.info("initialized");
            success = true;
        } catch (IOException ex) {
            throw new ElasticsearchException("failed to bind service", ex);
        } finally {
            if (!success) {
                IOUtils.closeWhileHandlingException(resourcesToClose);
            }
        }
    }

解析

  1. 创建节点环境,包括nodeId/nodePaths/logger等;创建tmpSettings,主要是一些节点配置信息。lock data目录。
  1. 打印 JVM 信息
  2. 创建PluginsService,加载classpath、modules目录和plugins目录下的所有模块
    public PluginsService(Settings settings, Path configPath, Path modulesDirectory, Path pluginsDirectory, Collection<Class<? extends Plugin>> classpathPlugins) {

        List<Tuple<PluginInfo, Plugin>> pluginsLoaded = new ArrayList<>();
        List<PluginInfo> pluginsList = new ArrayList<>();
        final List<String> pluginsNames = new ArrayList<>();

        // 加载 classpath 中的plugins, 供 tests 和 transport clients 使用
        for (Class<? extends Plugin> pluginClass : classpathPlugins) {
            pluginsLoaded.add(new Tuple<>(pluginInfo, plugin));
            pluginsList.add(pluginInfo);
        }

        Set<Bundle> seenBundles = new LinkedHashSet<>();
        List<PluginInfo> modulesList = new ArrayList<>();
        
        // 加载 modules
        if (modulesDirectory != null) {
            Set<Bundle> modules = getModuleBundles(modulesDirectory);
            for (Bundle bundle : modules) {
                ...
            }
            seenBundles.addAll(modules);
        }

        // 加载 plugins/ 目录下的 plugins
        if (pluginsDirectory != null) {
            Set<Bundle> plugins = getPluginBundles(pluginsDirectory);
            for (final Bundle bundle : plugins) {
                pluginsList.add(bundle.plugin);
            }
            seenBundles.addAll(plugins);
        }
        

        // 前面装载的每个module和plugin都是一个bundle, a "bundle" is a group of jars in a single classloader
        // 因此这里可以将modules和plugins统一封装为Plugin
        List<Tuple<PluginInfo, Plugin>> loaded = loadBundles(seenBundles);
        pluginsLoaded.addAll(loaded);
        
        // 将plugins和modules的元信息保存至PluginsAndModules info
        this.info = new PluginsAndModules(pluginsList, modulesList);
        
        // 将Plugin放入List<Tuple<PluginInfo, Plugin>> plugins
        this.plugins = Collections.unmodifiableList(pluginsLoaded);

    }
  1. 创建Node.environment
  2. 调用各插件的getExecutorBuilders,获取ExecutorBuilder
  3. 创建ThreadPool
    // ThreadPool构造函数:
    public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
 
        final Map<String, ExecutorBuilder> builders = new HashMap<>();
        // 获取本机cpu核数,假设cpu核数为8
        final int availableProcessors = EsExecutors.numberOfProcessors(settings);
        // (cpu+1)/2 在区间 [1,5] 中的取值,此处(8+1)/2 = 4, 在[1,5]区间内取值为4
        final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
        // (cpu+1)/2 在区间 [1,10] 中的取值,此处(8+1)/2 = 4, 在[1,5]区间内取值为4
        final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
        // 4*8=32,genericThreadPoolMax在区间[128,512]中的取值为128
        final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
        
        // 创建各种线程池的builder
        builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
        builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true));
        builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200));
        ...

        threadContext = new ThreadContext(settings);
        
        // 创建各种线程池
        final Map<String, ExecutorHolder> executors = new HashMap<>();
        for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
            final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
            executors.put(entry.getKey(), executorHolder);
        }
        executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
        this.executors = unmodifiableMap(executors);
    }

线程池类型:

  • DIRECT:即elasticsearch.common.util.concurrent.EsExecutors#DIRECT_EXECUTOR_SERVICE。通过调用方的当前线程,执行一个Runnable.run()过程,运行过程中该线程不允许被关闭。an {@link ExecutorService} that executes submitted tasks on the current thread. This executor service does not support being shutdown.
  • FIXED: 线程数量固定,有队列,队列长度为固定值。无空闲线程时,请求被放入队列中。参数:
size      the fixed number of threads
queueSize the size of the backing queue, -1 for unbounded
  • SCALING: 线程数量不固定,在core和max之间动态变化。参数:
core      the minimum number of threads in the pool
max       the maximum number of threads in the pool
keepAlive the time that spare threads above {@code core} threads will be kept alive
  • FIXED_AUTO_QUEUE_SIZE: 线程数量固定,有队列,队列长度为不固定。参数:
size             the fixed number of threads
initialQueueSize initial size of the backing queue
minQueueSize     the minimum size of the backing queue
maxQueueSize     the maximum size of the backing queue

ES中的线程池:

线程池名称

类型

介绍

参数默认值

SAME

DIRECT

通过当前线程直接执行某些逻辑

LISTENER

FIXED

用于java client得到响应时执行某些逻辑

size:min((availableProcessors + 1) / 2, 10),queueSize:10

GET

FIXED

用于get请求

size:availableProcessors, queueSize:200

ANALYZE

FIXED

用于analyze(分词)操作

size:1, queueSize:16

WRITE

FIXED

用于put/bulk请求

size:availableProcessors, queueSize:200

FORCE_MERGE

FIXED

用于segment force-merge操作

size:1, queueSize:-1

GENERIC

SCALING

通用的线程,如NodeDiscovery等

core:4,max:availableProcessors*4 在区间[128,512]中的取值,keepAlive:30s

MANAGEMENT

SCALING

用于集群管理等

core:1,max:5,keepAlive:5m

FLUSH

SCALING

用于flush操作

core:1,max:min((availableProcessors + 1) / 2, 5),keepAlive:5m

REFRESH

SCALING

用于refresh操作

core:1,max:min((availableProcessors + 1) / 2, 10),keepAlive:5m

WARMER

SCALING

用于segment warm-up操作

core:1,max:min((availableProcessors + 1) / 2, 5),keepAlive:5m

SNAPSHOT

SCALING

用于snapshot操作

core:1,max:min((availableProcessors + 1) / 2, 5),keepAlive:5m

FETCH_SHARD_STARTED

SCALING

用于fetch shard开始操作

core:1,max:availableProcessors * 2,keepAlive:5m

FETCH_SHARD_STORE

SCALING

用于fetch shard存储操作

core:1,max:availableProcessors * 2,keepAlive:5m

SEARCH

FIXED_AUTO_QUEUE_SIZE

用于search/count/suggest请求

size:((availableProcessors * 3) / 2) + 1,initialQueueSize:1000,minQueueSize:1000,maxQueueSize:1000

  1. 创建客户端NodeClient
  2. 创建各种服务类对象xxxService,利用Guice注册各种服务使用的模块xxxModule
  • Service:

服务

简介

ResourceWatcherService

监控统计各种服务使用的资源

NetworkService

TCP/IP/PORT等网络配置管理

ClusterService

集群管理,集群状态发布、更新等

IngestService

Ingest Node的写入数据预处理服务

ClusterInfoService

用于获取最新的集群信息

UsageService

监视Elasticsearch各种功能的使用情况

MonitorService

JVM、进程、系统的监控服务

CircuitBreakerService

熔断服务,资源使用超限时阻止任务执行

MetaStateService

读写Metadata和IndexMetadata

IndicesService

索引管理,索引的创建、删除等操作

IndicesClusterStateService

集群状态更新时,处理索引相关的操作

MetaDataIndexUpgradeService

更新IndexMetadata至最新版本

TemplateUpgradeService

节点加入集群时,升级其plugins相关的template

TransportService

Transport层网络服务

HttpServerTransport

Http层网络服务,提供REST接口服务

ResponseCollectorService

收集每个节点上执行任务的队列大小,响应时间和服务时间的统计信息

NodeService

一个节点的实例,负责调用各种服务

SearchService

处理查询任务

SnapshotsService

快照服务

Discovery

集群发现服务

RoutingService

路由表管理

GatewayService

集群、索引的元数据的持久化及恢复

  • Module:ScriptModule、AnalysisModule、SettingsModule、PluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule。各模块功能可以参照上面的同名Service。

第三步:启动Node

PATH

elasticsearch\node\Node.java

CODE

    /**
     * Start the node. If the node is already started, this method is no-op.
     */
    public Node start() throws NodeValidationException {
        // 1. 状态机,将local node的state设为STARTED状态
        if (!lifecycle.moveToStarted()) {
            return this;
        }


        logger.info("starting ...");

        // LifecycleComponent in modules and plugins start
        pluginLifecycleComponents.forEach(LifecycleComponent::start);

        // 2. 获取创建Node时各种模块及服务绑定的实例,启动这些实例
        // AbstractLifecycleComponent.start() -> class.doStart()

        injector.getInstance(MappingUpdatedAction.class).setClient(client);
        injector.getInstance(IndicesService.class).start();
        injector.getInstance(IndicesClusterStateService.class).start();
        injector.getInstance(SnapshotsService.class).start();
        injector.getInstance(SnapshotShardsService.class).start();
        injector.getInstance(RoutingService.class).start();
        injector.getInstance(SearchService.class).start();
        nodeService.getMonitorService().start();
        ...
        Discovery discovery = injector.getInstance(Discovery.class);
        clusterService.getMasterService().setClusterStatePublisher(discovery::publish);

        // 启动 transport service
        TransportService transportService = injector.getInstance(TransportService.class);
        transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
        transportService.start();
        
        // 加载本地的MeteData信息
        final MetaData onDiskMetadata;
        try {
            if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
                onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
            } else {
                onDiskMetadata = MetaData.EMPTY_META_DATA;
            }

        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        
        // bootstrap的各项检测:BootstrapChecks.check
        validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
            .filterPlugins(Plugin
            .class)
            .stream()
            .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));

        // 初始化ClusterState,启动Discovery
        discovery.start(); 
        // 启动clusterServeice、clusterApplierService、masterService
        clusterService.start();
        
        // transport层启动,开始接受请求
        transportService.acceptIncomingRequests();
        // initial discovery -> ZenDiscovery.java innerJoinCluster(),加入集群
        discovery.startInitialJoin();

        // Http层启动,开始接受请求
        injector.getInstance(HttpServerTransport.class).start();

        // 节点启动成功
        logger.info("started");

        pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

        return this;
    }

解析

启动node时,主要是获取各个服务模块绑定的实例,调用每个实例的start()方法(实际是 class.doStart())来启动各项服务。这其中比较重要的几个过程有:

  1. 启动 transport service,使得后续该节点可通过discovery过程加入集群
  2. 如果该节点node.master属性为true的话,加载本地的metadata,以获取原集群的信息(节点挂掉后重启的场景)
  3. bootstrap check,检测ES当前的运行环境,主要是操作系统和JVM参数,如下图所示。某些检测不通过则ES会报错退出。各项检测的具体含义可以参考官方文档 Bootstrap Checks
  1. 启动discovery和clusterService,初始化集群元信息ClusterState
  2. 启动transport服务,用于节点间通信
  3. 启动initial discovery,加入所属的Elasticsearch集群
  4. 启动http服务,开始接受用户请求

启动日志

最后我们通过ES节点的日志来验证下上面讲述的节点启动流程

# 开始创建Node
[2018-12-28T19:54:44,159][INFO ][o.e.n.Node               ] [] initializing ...
# 读取本地目录信息、JVM信息,创建NodeEnvironment
[2018-12-28T19:54:44,376][INFO ][o.e.e.NodeEnvironment    ] [V9VXhfr] using [1] data paths, mounts [[项目 (D:)]], net usable_space [271.7gb], net total_space [310gb], types [NTFS]
[2018-12-28T19:54:44,376][INFO ][o.e.e.NodeEnvironment    ] [V9VXhfr] heap size [1gb], compressed ordinary object pointers [true]
# 打印NodeName等节点信息
[2018-12-28T19:54:44,379][INFO ][o.e.n.Node               ] [V9VXhfr] node name derived from node ID [V9VXhfr9TvSyUfxyr-ZQWg]; set [node.name] to override
[2018-12-28T19:54:44,379][INFO ][o.e.n.Node               ] [V9VXhfr] version[6.4.3-SNAPSHOT], pid[19512], build[unknown/unknown/Unknown/Unknown], OS[Windows 7/6.1/amd64], JVM["Oracle Corporation"/Java HotSpot(TM) 64-Bit Server VM/10.0.1/10.0.1+10]
# 打印JVM信息
[2018-12-28T19:54:44,379][INFO ][o.e.n.Node               ] [V9VXhfr] JVM arguments [-agentlib:jdwp=transport=dt_socket,address=127.0.0.1:8831,suspend=y,server=n, -Des.path.home=D:\elasticsearch_release\elasticsearch-6.4.3, -Des.path.conf=D:\elasticsearch_release\elasticsearch-6.4.3\config, -Djava.security.policy=D:\elasticsearch_release\elasticsearch-6.4.3\config\java.policy, -Dlog4j2.disable.jmx=true, -Xms1g, -Xmx1g, -javaagent:C:\Users\morningchen\.IntelliJIdea2018.3\system\groovyHotSwap\gragent.jar, -javaagent:C:\Users\morningchen\.IntelliJIdea2018.3\system\captureAgent\debugger-agent.jar, -Dfile.encoding=UTF-8]
# 初始化各种Service和Module
# 加载各种module
[2018-12-28T19:54:53,359][INFO ][o.e.p.PluginsService     ] [V9VXhfr] loaded module [aggs-matrix-stats]
[2018-12-28T19:54:53,359][INFO ][o.e.p.PluginsService     ] [V9VXhfr] loaded module [analysis-common]
... 
[2018-12-28T19:54:53,362][INFO ][o.e.p.PluginsService     ] [V9VXhfr] loaded module [x-pack-watcher]
# 加载plugin
[2018-12-28T19:54:53,362][INFO ][o.e.p.PluginsService     ] [V9VXhfr] no plugins loaded
[2018-12-28T19:54:58,078][DEBUG][o.e.a.ActionModule       ] Using REST wrapper from plugin org.elasticsearch.xpack.security.Security
[2018-12-28T19:54:58,271][INFO ][o.e.d.DiscoveryModule    ] [V9VXhfr] using discovery type [zen]
[2018-12-28T19:54:59,102][INFO ][o.e.n.Node               ] [V9VXhfr] initialized
# 开始启动Node
[2018-12-28T19:54:59,102][INFO ][o.e.n.Node               ] [V9VXhfr] starting ...
# 启动Transport服务
[2018-12-28T19:54:59,428][INFO ][o.e.t.TransportService   ] [V9VXhfr] publish_address {10.40.98.48:9300}, bound_addresses {127.0.0.1:9300}, {[::1]:9300}
# BootstrapChecks
[2018-12-28T19:54:59,442][INFO ][o.e.b.BootstrapChecks    ] [V9VXhfr] bound or publishing to a non-loopback address, enforcing bootstrap checks
# 加入集群
[2018-12-28T19:55:54,939][INFO ][o.e.c.s.MasterService    ] [V9VXhfr] zen-disco-elected-as-master ([0] nodes joined)[, ], reason: new_master {V9VXhfr}{V9VXhfr9TvSyUfxyr-ZQWg}{m7N1OuBHRSuJlyGOoIJmqw}{10.40.98.48}{10.40.98.48:9300}{ml.machine_memory=17099067392, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}
[2018-12-28T19:55:54,945][INFO ][o.e.c.s.ClusterApplierService] [V9VXhfr] new_master {V9VXhfr}{V9VXhfr9TvSyUfxyr-ZQWg}{m7N1OuBHRSuJlyGOoIJmqw}{10.40.98.48}{10.40.98.48:9300}{ml.machine_memory=17099067392, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}, reason: apply cluster state (from master [master {V9VXhfr}{V9VXhfr9TvSyUfxyr-ZQWg}{m7N1OuBHRSuJlyGOoIJmqw}{10.40.98.48}{10.40.98.48:9300}{ml.machine_memory=17099067392, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true} committed version [1] source [zen-disco-elected-as-master ([0] nodes joined)[, ]]])
# 启动Http服务
[2018-12-28T19:55:58,722][INFO ][o.e.x.s.t.n.SecurityNetty4HttpServerTransport] [V9VXhfr] publish_address {10.40.98.48:9200}, bound_addresses {127.0.0.1:9200}, {[::1]:9200}
# 节点启动完毕
[2018-12-28T19:55:58,723][INFO ][o.e.n.Node               ] [V9VXhfr] started
# license检测
[2018-12-28T19:55:58,902][INFO ][o.e.l.LicenseService     ] [V9VXhfr] license [a4e08819-7a8b-4017-8b85-5329bc2909b0] mode [basic] - valid
# 开始恢复本地数据
[2018-12-28T19:55:58,926][INFO ][o.e.g.GatewayService     ] [V9VXhfr] recovered [0] indices into cluster_state

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 使用IDEA调试ES(Elasticsearch)源码教程

    本文以Elasticsearch v6.4.3 为例,介绍使用IDEA调试Elasticsearch源码的方法。

    morningchen
  • 腾讯万亿级 Elasticsearch 内存效率提升解密

    Elasticsearch( ES )是一款功能强大的开源分布式实时搜索引擎,在日志分析(主要应用场景)、企业级搜索、时序分析等领域有广泛应用,几乎是各大公司搜...

    morningchen
  • POI 操作word

    关于POI 操作word的基础知识在这个博客(http://elim.iteye.com/blog/2049110)中有非常清晰的解释,在这里我就不多解释了 

    凯哥Java
  • 移动物联网 之 家电节能 (2)

        本系列文章结合时下正热的“物联网”概念,介绍实现“家电节能”的一套解决方案。本部分讲述 “家电节能”的具体实现方法。 1. 系统结构 系统包括Senso...

    ShiJiong
  • 用户管理模块之用户注册

    爱撒谎的男孩
  • 【leetcode刷题】20T42-删除排序数组中的重复项 II

    https://leetcode-cn.com/problems/remove-duplicates-from-sorted-array-ii/

    木又AI帮
  • 零基础入门 22: UGUI Panel

    今天要给大家分享的。。。可谓是,非常的简单啊,我甚至都在考虑要不要单独抽出一次当做分享内容 (・-・*),也有可能是我所有分享里文字数目最少的

    韩东吉
  • 1400小时开源语音数据集,你想要都在这儿

    3 月 1 日,由 Mozilla 基金会发起的 Common Voice 项目,发布新版语音识别数据集,包括来自 42000 名贡献者,超过 1400 小时的...

    AI科技大本营
  • Java设计模式学习记录-适配器模式

    之前已经将五个创建型设计模式介绍完了,从这一篇开始介绍结构型设计模式,适配器模式就是结构型模式的一种,适配器要实现的效果是把“源”过渡到“目标”。

    纪莫
  • 反向传播(backpropagation)算法 | 深度学习笔记

    接上一篇(多层感知机(MLP)与神经网络结构 | 深度学习笔记)的最后,我们要训练多层网络的时候,最后关键的部分就是求梯度啦。纯数学方法几乎是不可能的,那么反向...

    用户1332428

扫码关注云+社区

领取腾讯云代金券