前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >看elasticsearch二阶段提交(2PC)

看elasticsearch二阶段提交(2PC)

原创
作者头像
用户1233856
发布2022-08-07 16:03:08
3930
发布2022-08-07 16:03:08
举报
文章被收录于专栏:白码晚股白码晚股

二阶段提交

二阶段提交(Two-phase Commit),使分布式架构下所有节点保持事务一致性的算法(Algorithm)。

假设

  1. 2个角色:协调者(Coordinator),参与者(Cohorts)。两者之间可以进行rpc。
  2. undo/redo:所有节点都预写式日志,且日志持久化在可靠的存储设备上。
  3. 节点可靠:所有节点不会永久性损坏,即使损坏后仍然可以恢复。

过程;

  1. 第一阶段投票阶段,各参与者投票是否要继续接下来的提交操作;
  2. 第二阶段完成阶段,因为无论结果怎样,协调者都必须在此阶段结束当前事务。
流程图
流程图

堆栈

堆栈
堆栈

代码分析

PrioritizedEsThreadPoolExecutor

代码语言:java
复制
        @Override
        public void run() {
            synchronized (this) {
                // make the task as stared. This is needed for synchronization with the timeout handling
                // see  #scheduleTimeout()
                started = true;
                FutureUtils.cancel(timeoutFuture);
            }
            runAndClean(runnable);
        }
PrioritizedEsThreadPoolExecutor
PrioritizedEsThreadPoolExecutor

MasterService

代码语言:java
复制
                logger.debug("publishing cluster state version [{}]", newClusterState.version());
                try {
                    clusterStatePublisher.accept(clusterChangedEvent, taskOutputs.createAckListener(threadPool, newClusterState));
                } catch (Discovery.FailedToCommitClusterStateException t) {
                    final long version = newClusterState.version();
                    logger.warn(() -> new ParameterizedMessage(
                            "failing [{}]: failed to commit cluster state version [{}]", summary, version), t);
                    taskOutputs.publishingFailed(t);
                    return;
                }
MasterService
MasterService

ZenDiscovery

代码语言:java
复制
    try {
            publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
        } catch (FailedToCommitClusterStateException t) {
            // cluster service logs a WARN message
            logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",
                newState.version(), electMaster.minimumMasterNodes());

            synchronized (stateMutex) {
                pendingStatesQueue.failAllStatesAndClear(
                    new ElasticsearchException("failed to publish cluster state"));

                rejoin("zen-disco-failed-to-publish");
            }
            throw t;
        }
ZenDiscovery
ZenDiscovery

PublishClusterStateAction

代码语言:java
复制
 try {
            innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates,
                serializedDiffs);
        } catch (Discovery.FailedToCommitClusterStateException t) {
            throw t;
        } catch (Exception e) {
            // try to fail committing, in cause it's still on going
            if (sendingController.markAsFailed("unexpected error", e)) {
                // signal the change should be rejected
                throw new Discovery.FailedToCommitClusterStateException("unexpected error", e);
            } else {
                throw e;
            }
        }


            private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,
                              final SendingController sendingController, final Discovery.AckListener ackListener,
                              final boolean sendFullVersion, final Map<Version, BytesReference> serializedStates,
                              final Map<Version, BytesReference> serializedDiffs) {

        final ClusterState clusterState = clusterChangedEvent.state();
        final ClusterState previousState = clusterChangedEvent.previousState();
        final TimeValue publishTimeout = discoverySettings.getPublishTimeout();

        final long publishingStartInNanos = System.nanoTime();

        // 2pc第一个请求
        for (final DiscoveryNode node : nodesToPublishTo) {
            // try and serialize the cluster state once (or per version), so we don't serialize it
            // per node when we send it over the wire, compress it while we are at it...
            // we don't send full version if node didn't exist in the previous version of cluster state
            if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
                sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
            } else {
                sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController);
            }
        }

        // 超时或响应node不够,就abore
        sendingController.waitForCommit(discoverySettings.getCommitTimeout());

        final long commitTime = System.nanoTime() - publishingStartInNanos;

        ackListener.onCommit(TimeValue.timeValueNanos(commitTime));
        // 2pc的第二个请求
        try {
            long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - commitTime);
            final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
            sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
            if (sendingController.getPublishingTimedOut()) {
                DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();
                // everyone may have just responded
                if (pendingNodes.length > 0) {
                    logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})",
                        clusterState.version(), publishTimeout, pendingNodes);
                }
            }
            // The failure is logged under debug when a sending failed. we now log a summary.
            Set<DiscoveryNode> failedNodes = publishResponseHandler.getFailedNodes();
            if (failedNodes.isEmpty() == false) {
                logger.warn("publishing cluster state with version [{}] failed for the following nodes: [{}]",
                    clusterChangedEvent.state().version(), failedNodes);
            }
        } catch (InterruptedException e) {
            // ignore & restore interrupt
            Thread.currentThread().interrupt();
        }
    }

    // 发布过程
    private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes,
                                        final DiscoveryNode node,
                                        final TimeValue publishTimeout,
                                        final SendingController sendingController,
                                        final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) {
        try {

            transportService.sendRequest(node, SEND_ACTION_NAME,
                    new BytesTransportRequest(bytes, node.getVersion()),
                    stateRequestOptions,
                    new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {

                        @Override
                        public void handleResponse(TransportResponse.Empty response) {
                            if (sendingController.getPublishingTimedOut()) {
                                logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node,
                                    clusterState.version(), publishTimeout);
                            }
                            sendingController.onNodeSendAck(node);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
                                logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());
                                sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
                            } else {
                                logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp);
                                sendingController.onNodeSendFailed(node, exp);
                            }
                        }
                    });
        } catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
            sendingController.onNodeSendFailed(node, e);
        }
    }
PublishClusterStateAction
PublishClusterStateAction

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 二阶段提交
    • 二阶段提交(Two-phase Commit),使分布式架构下所有节点保持事务一致性的算法(Algorithm)。
      • 假设
        • 过程;
        • 堆栈
        • 代码分析
        相关产品与服务
        Elasticsearch Service
        腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档