前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elasticsearch源码分析七之集群选举流程分析

Elasticsearch源码分析七之集群选举流程分析

作者头像
山行AI
发布2020-03-12 18:23:09
1K0
发布2020-03-12 18:23:09
举报
文章被收录于专栏:山行AI山行AI

Elasticsearch的org.elasticsearch.discovery.zen.ZenDiscovery模块是集群选主的主要模块。

初始化部分

org.elasticsearch.node.Node#start方法中有ZenDiscovery初始化的部分:

 Discovery discovery = injector.getInstance(Discovery.class); // 设置集群状态发布者 clusterService.getMasterService().setClusterStatePublisher(discovery::publish); ---------------------- // start after transport service so the local disco is known discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
-----------------------// 开始加入discovery.startInitialJoin();

之前的文章中介绍过,elasticsearch是通过guice来作为ioc管理的。这里首先从ioc中注入discovery对象,然后调用start方法,继而进行初始化加入集群操作,我们接下来对这些进行分析。

ZenDiscovery

先看下类结构图:
构造方法
  public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,                        NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,                        ClusterSettings clusterSettings, SeedHostsProvider hostsProvider, AllocationService allocationService,                        Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, RerouteService rerouteService) {        this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);        this.masterService = masterService;        this.clusterApplier = clusterApplier;        this.transportService = transportService;        this.discoverySettings = new DiscoverySettings(settings, clusterSettings);        this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);        this.zenPing = newZenPing(settings, threadPool, transportService, hostsProvider);        this.electMaster = new ElectMasterService(settings);        this.pingTimeout = PING_TIMEOUT_SETTING.get(settings);        this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);        this.joinRetryAttempts = JOIN_RETRY_ATTEMPTS_SETTING.get(settings);        this.joinRetryDelay = JOIN_RETRY_DELAY_SETTING.get(settings);        this.maxPingsFromAnotherMaster = MAX_PINGS_FROM_ANOTHER_MASTER_SETTING.get(settings);        this.sendLeaveRequest = SEND_LEAVE_REQUEST_SETTING.get(settings);        this.threadPool = threadPool;        this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);        this.committedState = new AtomicReference<>();        //node.master配置为false的节点是否参与主节点选举        this.masterElectionIgnoreNonMasters = MASTER_ELECTION_IGNORE_NON_MASTER_PINGS_SETTING.get(settings);        // 选举加入超时        this.masterElectionWaitForJoinsTimeout = MASTER_ELECTION_WAIT_FOR_JOINS_TIMEOUT_SETTING.get(settings);
        logger.debug("using ping_timeout [{}], join.timeout [{}], master_election.ignore_non_master [{}]",                this.pingTimeout, joinTimeout, masterElectionIgnoreNonMasters);     clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,            this::handleMinimumMasterNodesChanged, (value) -> {                final ClusterState clusterState = this.clusterState();                int masterNodes = clusterState.nodes().getMasterNodes().size();
                if (clusterState.nodes().isLocalNodeElectedMaster() && value > masterNodes) {                    throw new IllegalArgumentException("cannot set "                        + ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.getKey() + " to more than the current" +                        " master nodes count [" + masterNodes + "]");                }        });        // 主节点探测        this.masterFD = new MasterFaultDetection(settings, threadPool, transportService, this::clusterState, masterService, clusterName);        // 主节点异常监听器        this.masterFD.addListener(new MasterNodeFailureListener());        // 集群节点探测        this.nodesFD = new NodesFaultDetection(settings, threadPool, transportService, this::clusterState, clusterName);        // 集群节点探测监听器        this.nodesFD.addListener(new NodeFaultDetectionListener());        // 处于pending状态的队列        this.pendingStatesQueue = new PendingClusterStatesQueue(logger, MAX_PENDING_CLUSTER_STATES_SETTING.get(settings));        // 发布集群状态的action        this.publishClusterState =                new PublishClusterStateAction(                        transportService,                        namedWriteableRegistry,                        this,                        discoverySettings);        // MembershipAction定义三类请求,分别是LeaveRequest、JoinRequest、ValidateJoinRequest;同时还定义了这些请求的TransportRequestHandler,        // 分别是LeaveRequestRequestHandler、JoinRequestRequestHandler、ValidateJoinRequestRequestHandler        this.membership = new MembershipAction(transportService, new MembershipListener(), onJoinValidators);        // join线程处理的类,里面主要是关于维护joinThread的一系列方法        this.joinThreadControl = new JoinThreadControl();        // 节点join处理器,里面有一系列进行选举的方法        this.nodeJoinController = new NodeJoinController(settings, masterService, allocationService, electMaster, rerouteService);        // 节点故障后进行节点的移除操作,也可能触发副分片提升        this.nodeRemovalExecutor = new ZenNodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger);
        masterService.setClusterStateSupplier(this::clusterState);        // 在transportService中注册一个request handler        transportService.registerRequestHandler(            DISCOVERY_REJOIN_ACTION_NAME, ThreadPool.Names.SAME, RejoinClusterRequest::new, new RejoinClusterRequestHandler());    }

discovery.start()

该方法重载自ZenDiscovery的父类org.elasticsearch.common.component.AbstractLifecycleComponent,它的start方法定义如下:

 @Override    public void start() {        synchronized (lifecycle) {            if (!lifecycle.canMoveToStarted()) {                return;            }            for (LifecycleListener listener : listeners) {                listener.beforeStart();            }            doStart();            lifecycle.moveToStarted();            for (LifecycleListener listener : listeners) {                listener.afterStart();            }        }    }

可以看到这个方法的逻辑主要是在doStart方法的前面和后面做一些操作,它的核心方法是doStart()方法,实现逻辑在ZenDiscovery中,org.elasticsearch.discovery.zen.ZenDiscovery#doStart:

 @Override    protected void doStart() {        // 获取本地节点        DiscoveryNode localNode = transportService.getLocalNode();        assert localNode != null;        synchronized (stateMutex) {// 同步状态锁            // 设置初始状态            // set initial state            assert committedState.get() == null;            assert localNode != null;            ClusterState.Builder builder = ClusterState.builder(clusterName);            ClusterState initialState = builder                .blocks(ClusterBlocks.builder()                        // 表示节点刚启动,还没有恢复。在节点恢复后会去掉                    .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)                    .addGlobalBlock(noMasterBlockService.getNoMasterBlock()))             .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()))                .build();            committedState.set(initialState);            clusterApplier.setInitialState(initialState);            // 添加本地节点            nodesFD.setLocalNode(localNode);            // 设置线程的启动状态为true            joinThreadControl.start();        }        // zenPing主要用于主节点选举时ping,以此其他节点获取其他节点状态        zenPing.start();    }
  • nodesFD为NodesFaultDetection类型的,主要用于多节点故障检测。在这里初始化节点的集群状态,然后启动用于将节点加入集群的线程joinThreadControl,其中这些操作都需要进行同步以防止并发问题,在JoinThreadControl中有很多判断,如assert Thread.holdsLock(stateMutex) 用于判断当前线程是否获取到了stateMutex锁。下面看一下JoinThreadControl的代码:
  /**       * All control of the join thread should happen under the cluster state update task thread.       * This is important to make sure that the background joining process is always in sync with any cluster state updates       * like master loss, failure to join, received cluster state while joining etc.       */      private class JoinThreadControl {
          private final AtomicBoolean running = new AtomicBoolean(false);          private final AtomicReference<Thread> currentJoinThread = new AtomicReference<>();
          /** returns true if join thread control is started and there is currently an active join thread */          public boolean joinThreadActive() {              // 如果join thread control 已经启动并且已经存在一个活跃的join线程时则返回true              Thread currentThread = currentJoinThread.get();              return running.get() && currentThread != null && currentThread.isAlive();          }
          /** returns true if join thread control is started and the supplied thread is the currently active joinThread */          public boolean joinThreadActive(Thread joinThread) {              // 如果join thread control已经启动并且传入的线程就是当前活跃的join线程时返回true              return running.get() && joinThread.equals(currentJoinThread.get());          }
          /** cleans any running joining thread and calls {@link #rejoin}           * 1. 当前线程是否拥有stateMutex锁           * 2. currentJoinThread为AtomicReference类型,重置currentJoinThread的值为null           * 3. 调用rejoin方法重新加入集群           * */          public void stopRunningThreadAndRejoin(String reason) {              assert Thread.holdsLock(stateMutex);              currentJoinThread.set(null);              rejoin(reason);          }
          /** starts a new joining thread if there is no currently active one and join thread controlling is started */          // 如果当前没有活跃的join线程就启动一个新的join线程          public void startNewThreadIfNotRunning() {              // 判断当前线程是否持有状态锁,如果持有状态锁则继续向下进行              assert Thread.holdsLock(stateMutex);              // 如果join thread control 已经启动并且已经存在一个活跃的join线程时则返回true              if (joinThreadActive()) {                  return;              }              threadPool.generic().execute(new Runnable() {                  @Override                  public void run() {                      // 当前线程                      Thread currentThread = Thread.currentThread();                      // 如果当前的join 线程不存在,则设置当前线程为join线程然后继续向下执行                      // 否则,直接返回                      if (!currentJoinThread.compareAndSet(null, currentThread)) {                          return;                      }                      // running的值在org.elasticsearch.discovery.zen.ZenDiscovery.doStart中的 joinThreadControl.start()处会进行设置                      // 判断如果join线程已经启动并且是当前线程时则进行while循环                      while (running.get() && joinThreadActive(currentThread)) {                          try {                              // 进行join cluster                              innerJoinCluster();                              return;                          } catch (Exception e) {                              logger.error("unexpected error while joining cluster, trying again", e);                              // Because we catch any exception here, we want to know in                              // tests if an uncaught exception got to this point and the test infra uncaught exception                              // leak detection can catch this. In practise no uncaught exception should leak                              assert ExceptionsHelper.reThrowIfNotNull(e);                          }                      }                      // cleaning the current thread from currentJoinThread is done by explicit calls.                  }              });          }
          /**           * marks the given joinThread as completed and makes sure another thread is running (starting one if needed)           * If the given thread is not the currently running join thread, the command is ignored.           * 将线程状态标识为已完成,复位currentJoinThread中的值为null           */          public void markThreadAsDoneAndStartNew(Thread joinThread) {              assert Thread.holdsLock(stateMutex);              // 复位currentJoinThread的值为null              if (!markThreadAsDone(joinThread)) {                  return;              }              // 启动新的线程              startNewThreadIfNotRunning();          }
          /** marks the given joinThread as completed. Returns false if the supplied thread is not the currently active join thread */          public boolean markThreadAsDone(Thread joinThread) {              assert Thread.holdsLock(stateMutex);              // 复位状态              return currentJoinThread.compareAndSet(joinThread, null);          }
          public void stop() {              running.set(false);              Thread joinThread = currentJoinThread.getAndSet(null);              if (joinThread != null) {                  joinThread.interrupt();              }          }
          /**           * 将标识位设置为true           */          public void start() {              running.set(true);          }
      }
  • zenPing.start,zenPing为UnicastZenPing类型的,主要是用于选举时集群间的ping操作,它的start方法为一个空方法。 下面来看看节点是怎样与集群通信并进行选举的。

discovery.startInitialJoin()

话不多说,直接上代码:

@Override    public void startInitialJoin() {        // start the join thread from a cluster state update. See {@link JoinThreadControl} for details.        // 启动join线程        synchronized (stateMutex) {            // do the join on a different thread, the caller of this method waits for 30s anyhow till it is discovered            joinThreadControl.startNewThreadIfNotRunning();        }    }

可以看到,这里调用的是joinThreadControl.startNewThreadIfNotRunning,这里我们再回过头来看一下这个方法:

/** starts a new joining thread if there is no currently active one and join thread controlling is started */        // 如果当前没有活跃的join线程就启动一个新的join线程        public void startNewThreadIfNotRunning() {            // 判断当前线程是否持有状态锁,如果持有状态锁则继续向下进行            assert Thread.holdsLock(stateMutex);            // 如果join thread control 已经启动并且已经存在一个活跃的join线程时则返回true            if (joinThreadActive()) {                return;            }            threadPool.generic().execute(new Runnable() {                @Override                public void run() {                    // 当前线程                    Thread currentThread = Thread.currentThread();                    // 如果当前的join 线程不存在,则设置当前线程为join线程然后继续向下执行                    // 否则,直接返回                    if (!currentJoinThread.compareAndSet(null, currentThread)) {                        return;                    }                    // running的值在org.elasticsearch.discovery.zen.ZenDiscovery.doStart中的 joinThreadControl.start()处会进行设置                    // 判断如果join线程已经启动并且是当前线程时则进行while循环                    while (running.get() && joinThreadActive(currentThread)) {                        try {                            // 进行join cluster                            innerJoinCluster();                            return;                        } catch (Exception e) {                            logger.error("unexpected error while joining cluster, trying again", e);                            // Because we catch any exception here, we want to know in                            // tests if an uncaught exception got to this point and the test infra uncaught exception                            // leak detection can catch this. In practise no uncaught exception should leak                            assert ExceptionsHelper.reThrowIfNotNull(e);                        }                    }                    // cleaning the current thread from currentJoinThread is done by explicit calls.                }            });        }

在这里会调用es的generic线程池,执行innerJoinCluster方法,也就是处理当前节点加入集群的方法,代码如下:

 /**     * the main function of a join thread. This function is guaranteed to join the cluster     * or spawn a new join thread upon failure to do so.     *     * join 线程的主要逻辑。这个函数保证加入集群或者在当失败时产生一个新的join线程。     */    private void innerJoinCluster() {        DiscoveryNode masterNode = null;        // 当前线程        final Thread currentThread = Thread.currentThread();        // 开始选举上下文        nodeJoinController.startElectionContext();        // 当主节点为null并且join线程为当前线程时进行选主        while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {            masterNode = findMaster();        }
        if (!joinThreadControl.joinThreadActive(currentThread)) {            logger.trace("thread is no longer in currentJoinThread. Stopping.");            return;        }        // 如果自己当选主节点        if (transportService.getLocalNode().equals(masterNode)) {            //配置discovery.zen.minimum_master_nodes要求的最少加入自己的节点数            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one            logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,                    new NodeJoinController.ElectionCallback() {                        @Override                        public void onElectedAsMaster(ClusterState state) {                            synchronized (stateMutex) {                                //当选为主节点,清空状态,以备下一次选举                                joinThreadControl.markThreadAsDone(currentThread);                            }                        }
                        @Override                        public void onFailure(Throwable t) {                            logger.trace("failed while waiting for nodes to join, rejoining", t);                            synchronized (stateMutex) {                                // 重新进行选举                                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);                            }                        }                    }
            );        } else {            // process any incoming joins (they will fail because we are not the master)            // 本次选举未成为主节点时,停止选举            nodeJoinController.stopElectionContext(masterNode + " elected");
            // send join request            // 向主节点发送join 请求            final boolean success = joinElectedMaster(masterNode);
            synchronized (stateMutex) {                if (success) {                    // 如果join成功                    DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();                    if (currentMasterNode == null) {                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have                        // a valid master.                        logger.debug("no master node is set, despite of join request completing. retrying pings.");                        // 标识当前join线程为完成状态并开始一个新的join 线程                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);                    } else if (currentMasterNode.equals(masterNode) == false) {// 如果两边数据不一致,主节点不一致,关闭当前join线程并rejoin                        // update cluster state                        joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");                    }
                    joinThreadControl.markThreadAsDone(currentThread);                } else {                    // failed to join. Try again...                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);                }            }        }    }

会先调用nodeJoinController.startElectionContext()方法进行选举上下文的一些准备工作,然后会进入while循环,只要主节点为空并且当前执行线程为当前节点的join线程时循环就会一直进行下去。也就是在这一步一定要选出一个主节点。然后判断一下选出的主节点是不是当前启动的本地节点(也就是说选出的主节点是自己),如果是则执行后续处理如重置join线程的状态等,如果自己不是主节点则尝试加入主节点。选主部分代码:

 while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {     masterNode = findMaster(); }

我们来分析一下findMaster方法和joinElectedMaster(masterNode)方法。

findMaster

 private DiscoveryNode findMaster() {        logger.trace("starting to ping");        // ping的所有结果        List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();        if (fullPingResponses == null) {            logger.trace("No full ping responses");            return null;        }        if (logger.isTraceEnabled()) {            StringBuilder sb = new StringBuilder();            if (fullPingResponses.size() == 0) {                sb.append(" {none}");            } else {                for (ZenPing.PingResponse pingResponse : fullPingResponses) {                    sb.append("\n\t--> ").append(pingResponse);                }            }            logger.trace("full ping responses:{}", sb);        }        // 获取本地节点        final DiscoveryNode localNode = transportService.getLocalNode();
        // add our selves        // 先断言,确保当前的fullPingResponses列表中没有localNode        assert fullPingResponses.stream().map(ZenPing.PingResponse::node)            .filter(n -> n.equals(localNode)).findAny().isPresent() == false;        //将本地节点添加到fullPingResponses列表中        fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));
        // filter responses        // 如果node.master配置为false的节点不参与主节点选举,则过滤掉没有配置成主节点的节点        final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);        //activeMasters用来记录当前已经存在的主节点        List<DiscoveryNode> activeMasters = new ArrayList<>();        for (ZenPing.PingResponse pingResponse : pingResponses) {            // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without            // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()            // 我们不能将本地节点包含在pingMasters列表中,否则我们可能会在ZenDiscover#innerJoinCluster()中            // 选举自己而没有来自其他节点的任何检查/验证 (当连不上集群其他节点出现网络分区或其他网络问题时,本地节点连不上集群,如果允许自己选自己,那么本地节点就可能在没有其他节点验证的情况下成为主节点)            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {                // 这里用于过滤掉自己选自己的情况(也就是说当选定的master不为当地节点时添加到activeMasters列表中)                // 本地节点为新启动的节点,这时可能已经存在主节点了,本地节点这里是作为一个候选节点存在的,                // 但是上面有这么一段:fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()))                // 这一个PingResponse在进入这个判断时会直接会被筛选掉,但是当没有可选的主节点时,在下面的候选节点选举中本地节点就可以发挥作用了。                activeMasters.add(pingResponse.master());            }        }
        // masterCandidates用来记录配置为可以成为主节点的候选节点        // nodes discovered during pinging (ping的过程中发现的节点)        List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();        // /这里将返回节点中配置为可以作为主节点的节点加入候选节点中        for (ZenPing.PingResponse pingResponse : pingResponses) {            // Can this node become master or not.            // 这里要注意isMasterNode并不是说明该节点是不是主节点,而是表明该节点能不能成为主节点            if (pingResponse.node().isMasterNode()) {                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));            }        }        // 如果当前存在的主节点列表activeMasters为空,则从候选节点列表masterCandidates中选取主节点        if (activeMasters.isEmpty()) {            // 候选节点的数量要大于配置的最小的主节点数量            if (electMaster.hasEnoughCandidates(masterCandidates)) {                // 获取clusterStateVersion最小的那个备选节点作为winner                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);                logger.trace("candidate {} won election", winner);                return winner.getNode();            } else {                // if we don't have enough master nodes, we bail, because there are not enough master to elect from                // 如果我们没有足够数量的用于选举的主节点则返回null                logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",                            masterCandidates, electMaster.minimumMasterNodes());                return null;            }        } else {// 否则从主节点列表中先择            // 先保证主节点列表中不包括本地节点(当有其他节点显示为主节点时,本地节点不应该被选作主节点)            assert !activeMasters.contains(localNode) :                "local node should never be elected as master when other nodes indicate an active master";            // lets tie break between discovered nodes            // 打破发现的节点之间的平衡            // 比较传入的两个节点,如果c1.getNode是主节点,c2.getNode不是主节点则c1为主;反之,c2.getNode获胜            // 如果上面两个条件不满足,则比较c1.getNode的id与c2.getNode的id,按节点从小到大排序,获取id最小的那个            return electMaster.tieBreakActiveMasters(activeMasters);        }    }

这里就是选主的主要流程,先通过pingAndWait方法获取到本地节点与集群中配置的其它节点之间ping通信之后的response信息,并过滤掉pingResponses中的本地节点信息,然后通过fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()))向pingResponses中添加一个master值为null的PingResponse对象。如果node.master配置为false的节点不参与主节点选举,则过滤掉pingResponses中没有配置成主节点的节点。在进行activeMasters列表归集时,会过滤掉本地节点,避免自己选自己(因为如果允许自己选自己,当出现网络分区或其他网络问题时,自己可能会在没有其他节点验证的情况下成为主节点。如果activeMasters列表不为空,则选择其中nodeId最小的那个为主节点。如果activeMasters列表为空,则继续将上面的pingResponses列表作为候选列表(注意,此时本地节点在候选列表中哦),然后从候选节点列表中选择nodeId最小的那个作为主节点(此处略去一些校验部分)。

joinElectedMaster(masterNode)方法

当选出的masterNode不是本地节点时会调用这个方法,用于让本地节点加入master。我们看下具体代码:

  /**     * Join a newly elected master.     *     * @return true if successful     */    private boolean joinElectedMaster(DiscoveryNode masterNode) {        try {            // first, make sure we can connect to the master            // 首先确保能连接上master节点            transportService.connectToNode(masterNode);        } catch (Exception e) {            logger.warn(() -> new ParameterizedMessage("failed to connect to master [{}], retrying...", masterNode), e);            // 如果出现异常,则直接返回false            return false;        }        int joinAttempt = 0; // we retry on illegal state if the master is not yet ready        while (true) {            try {                logger.trace("joining master {}", masterNode);                // 向主节点发送加入的请求                membership.sendJoinRequestBlocking(masterNode, transportService.getLocalNode(), joinTimeout);                return true;            } catch (Exception e) {                final Throwable unwrap = ExceptionsHelper.unwrapCause(e);                if (unwrap instanceof NotMasterException) {// 如果异常产生原因是没有选主成功,则进行重试                    //当达到最大重试次数时,直接返回false                    if (++joinAttempt == this.joinRetryAttempts) {                        logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode,                            ExceptionsHelper.detailedMessage(e), joinAttempt);                        return false;                    } else {//重试                        logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode,                            ExceptionsHelper.detailedMessage(e), joinAttempt);                    }                } else {// 否则,返回false                    if (logger.isTraceEnabled()) {                        logger.trace(() -> new ParameterizedMessage("failed to send join request to master [{}]", masterNode), e);                    } else {                        logger.info("failed to send join request to master [{}], reason [{}]", masterNode,                            ExceptionsHelper.detailedMessage(e));                    }                    return false;                }            }
            try {                // 每次重试中间都间隔一段时间                Thread.sleep(this.joinRetryDelay.millis());            } catch (InterruptedException e) {                Thread.currentThread().interrupt();            }        }    }

真实的join过程是通过这段代码:

  while (true) {            try {                logger.trace("joining master {}", masterNode);                // 向主节点发送加入的请求                membership.sendJoinRequestBlocking(masterNode, transportService.getLocalNode(), joinTimeout);                return true;            } catch (Exception e) {            ---------------

通过membership的sendJoinRequestBlocking方法来尝试加入主节点,如果失败还会根据配置每隔一段时间重试一次直到最大重试次数为止。

我们接着来看看org.elasticsearch.discovery.zen.MembershipAction#sendJoinRequestBlocking方法:

 public void sendJoinRequestBlocking(DiscoveryNode masterNode, DiscoveryNode node, TimeValue timeout) {        transportService.submitRequest(masterNode, DISCOVERY_JOIN_ACTION_NAME, new JoinRequest(node),            EmptyTransportResponseHandler.INSTANCE_SAME).txGet(timeout.millis(), TimeUnit.MILLISECONDS);    }

该方法会向master节点的Action发送请求,响应处理是通过JoinRequestRequestHandler来处理的,关于JoinRequestRequestHandler我们简要地看下如下代码:

  private class JoinRequestRequestHandler implements TransportRequestHandler<JoinRequest> {
        @Override        public void messageReceived(final JoinRequest request, final TransportChannel channel, Task task) throws Exception {            listener.onJoin(request.getNode(), new JoinCallback() {                @Override                public void onSuccess() {                    try {                        channel.sendResponse(TransportResponse.Empty.INSTANCE);                    } catch (Exception e) {                        onFailure(e);                    }                }                -------------------------

这里本篇不做过多分析,需要注意的是listener.onJoin中的listener是在ZenDiscovery的构造方法中实例化的MembershipListener,在该listener中会对当前节点加入master节点的结果进行响应并做一些后续操作,我们在之后的文章中再作分析。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 初始化部分
  • ZenDiscovery
    • 先看下类结构图:
      • 构造方法
      • discovery.start()
      • discovery.startInitialJoin()
        • findMaster
          • joinElectedMaster(masterNode)方法
          相关产品与服务
          Elasticsearch Service
          腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档