Elasticsearch 底层系列之分片恢复解析

    我们是基础架构部,腾讯云 CES/CTSDB 产品后台服务的支持团队,我们拥有专业的ES开发运维能力,为大家提供稳定、高性能的服务,欢迎有需求的童鞋接入,同时也欢迎各位交流 Elasticsearch、Lucene 相关的技术!

1. 前言

    在线上生产环境中,对于大规模的ES集群出现节点故障的场景比较多,例如,网络分区、机器故障、集群压力等等,都会导致节点故障。当外在环境恢复后,节点需要重新加入集群,那么当节点重新加入集群时,由于ES的自平衡策略,需要将某些分片恢复到新加入的节点上,那么ES的分片恢复流程是如何进行的呢?遇到分片恢复的坑该如何解决呢?(这里线上用户有碰到过,当恢复的并发调得较大时,会触发es的bug导致分布式死锁)?分片恢复的完整性、一致性如何保证呢?,本文将通过ES源码一窥究竟。注:ES分片恢复的场景有多种,本文只剖析最复杂的场景--peer recovery。

2. 分片恢复总体流程

    ES副本分片恢复主要涉及恢复的目标节点和源节点,目标节点即故障恢复的节点,源节点为提供恢复的节点。目标节点向源节点发送分片恢复请求,源节点接收到请求后主要分两阶段来处理。第一阶段,对需要恢复的shard创建snapshot,然后根据请求中的metadata对比如果 syncid 相同且 doc 数量相同则跳过,否则对比shard的segment文件差异,将有差异的segment文件发送给target node。第二阶段,为了保证target node数据的完整性,需要将本地的translog发送给target node,且对接收到的translog进行回放。整体流程如下图所示。

分片恢复流程

    以上为恢复的总体流程,具体实现细节,下面将结合源码进行解析。

3. 副本分片流程

3.1. 目标节点请求恢复

    本节,我们通过源码来剖析副本分片的详细恢复流程。ES根据metadata的变化来驱动各个模块工作,副本分片恢复的起始入口为IndicesClusterStateService.createOrUpdateShards,这里首先会判断本地节点是否在routingNodes中,如果在,说明本地节点有分片创建或更新的需求,否则跳过。逻辑如下:

private void createOrUpdateShards(final ClusterState state) {
    RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
    if (localRoutingNode == null) {
        return;
    }
    DiscoveryNodes nodes = state.nodes();
    RoutingTable routingTable = state.routingTable();
    for (final ShardRouting shardRouting : localRoutingNode) {
        ShardId shardId = shardRouting.shardId();
        if (failedShardsCache.containsKey(shardId) == false) {
            AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());
            Shard shard = indexService.getShardOrNull(shardId.id());
            if (shard == null) { // shard不存在则需创建
                createShard(nodes, routingTable, shardRouting, state);
            } else { // 存在则更新
                updateShard(nodes, shardRouting, shard, routingTable, state);
            }
        }
    }
}

    副本分片恢复走的是createShard分支,在该方法中,首先获取shardRouting的类型,如果恢复类型为PEER,说明该分片需要从远端获取,则需要找到源节点,然后调用IndicesService.createShard:

private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
    DiscoveryNode sourceNode = null;
    if (shardRouting.recoverySource().getType() == Type.PEER)  {
        sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting); // 如果恢复方式是peer,则会找到shard所在的源节点进行恢复
        if (sourceNode == null) {
            return;
        }
    }
        RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
        indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting), repositoriesService, failedShardHandler);
        ... ...
}

private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) {
    DiscoveryNode sourceNode = null;
    if (!shardRouting.primary()) {
        ShardRouting primary = routingTable.shardRoutingTable(shardRouting.shardId()).primaryShard();
        if (primary.active()) {
            sourceNode = nodes.get(primary.currentNodeId()); // 找到primary shard所在节点
        }
    } else if (shardRouting.relocatingNodeId() != null) {
        sourceNode = nodes.get(shardRouting.relocatingNodeId()); // 找到搬迁的源节点
    } else {
         ... ...
    }
    return sourceNode;
}

    源节点的确定分两种情况,如果当前shard本身不是primary shard,则源节点为primary shard所在节点,否则,如果当前shard正在搬迁中(从其他节点搬迁到本节点),则源节点为数据搬迁的源头节点。得到源节点后调用IndicesService.createShard,在该方法中调用方法IndexShard.startRecovery开始恢复。对于恢复类型为PEER的任务,恢复动作的真正执行者为PeerRecoveryTargetService.doRecovery。在该方法中,首先获取shard的metadataSnapshot,该结构中包含shard的段信息,如syncid、checksum、doc数等,然后封装为 StartRecoveryRequest,通过RPC发送到源节点:

... ...
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
... ...
// 创建recovery quest 
request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.indexShard().routingEntry().allocationId().getId(), recoveryTarget.sourceNode(), clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId());
... ...
// 向源节点发送请求,请求恢复
cancellableThreads.execute(() -> responseHolder.set(
        transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
                new FutureTransportResponseHandler<RecoveryResponse>() {
                    @Override
                    public RecoveryResponse newInstance() {
                        return new RecoveryResponse();
                    }
                }).txGet()));

    注意,请求的发送是异步的,但是这里会调用 PlainTransportFuture.txGet() 方法,等待对端的回复,否则将一直 阻塞 。至此,目标节点已将请求发送给源节点,源节点的执行逻辑随后详细分析。

3.2 源节点处理恢复请求

    源节点接收到请求后会调用恢复的入口函数recover:

class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
    @Override
    public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
        RecoveryResponse response = recover(request);
        channel.sendResponse(response);
    }
}

    recover方法根据request得到shard并构造RecoverySourceHandler对象,然后调用handler.recoverToTarget进入恢复的执行体:

public RecoveryResponse recoverToTarget() throws IOException { // 恢复分为两阶段
    try (Translog.View translogView = shard.acquireTranslogView()) { 
        final IndexCommit phase1Snapshot;
        try {
            phase1Snapshot = shard.acquireIndexCommit(false);
        } catch (Exception e) {
            IOUtils.closeWhileHandlingException(translogView);
            throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
        }
        try {
            phase1(phase1Snapshot, translogView);  // 第一阶段,比较syncid和segment,然后得出有差异的部分,主动将数据推送给请求方
        } catch (Exception e) {
            throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
        } finally {
            try {
                shard.releaseIndexCommit(phase1Snapshot);
            } catch (IOException ex) {
                logger.warn("releasing snapshot caused exception", ex);
            }
        }
        // engine was just started at the end of phase 1
        if (shard.state() == IndexShardState.RELOCATED) {
            throw new IndexShardRelocatedException(request.shardId());
        }
        try {
            phase2(translogView.snapshot()); // 第二阶段,发送translog
        } catch (Exception e) {
            throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
        }
        finalizeRecovery();
    }
    return response;
}

    从上面的代码可以看出,恢复主要分两个阶段,第一阶段恢复segment文件,第二阶段发送translog。这里有个关键的地方,在恢复前,首先需要获取translogView及segment snapshot,translogView的作用是保证当前时间点到恢复结束时间段的translog不被删除,segment snapshot的作用是保证当前时间点之前的segment文件不被删除。接下来看看两阶段恢复的具体执行逻辑。phase1:

public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
    final Store store = shard.store(); //拿到shard的存储信息
    recoverySourceMetadata = store.getMetadata(snapshot); // 拿到snapshot的metadata
    String recoverySourceSyncId = recoverySourceMetadata.getSyncId();
            String recoveryTargetSyncId = request.metadataSnapshot().getSyncId();
            final boolean recoverWithSyncId = recoverySourceSyncId != null && recoverySourceSyncId.equals(recoveryTargetSyncId);
            if (recoverWithSyncId) { // 如果syncid相等,再继续比较下文档数,如果都相同则不用恢复
    final long numDocsTarget = request.metadataSnapshot().getNumDocs();
    final long numDocsSource = recoverySourceMetadata.getNumDocs();
    if (numDocsTarget != numDocsSource) {
        throw new IllegalStateException("... ...");
    } 
} else {
	final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot()); // 找出target和source有差别的segment
	List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
	phase1Files.addAll(diff.different);
	phase1Files.addAll(diff.missing);
	... ...
	final Function<StoreFileMetaData, OutputStream> outputStreamFactories =
        md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView), chunkSizeInBytes);
	sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); // 将需要恢复的文件发送到target node
	... ...
    }
    prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());

    从上面代码可以看出,phase1的具体逻辑是,首先拿到待恢复shard的metadataSnapshot从而得到recoverySourceSyncId,根据request拿到recoveryTargetSyncId,比较两边的syncid,如果相同再比较源和目标的文档数,如果也相同,说明在当前提交点之前源和目标的shard对应的segments都相同,因此不用恢复segment文件。如果两边的syncid不同,说明segment文件有差异,则需要找出所有有差异的文件进行恢复。通过比较recoverySourceMetadata和recoveryTargetSnapshot的差异性,可以找出所有有差别的segment文件。这块逻辑如下:

public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
    final List<StoreFileMetaData> identical = new ArrayList<>();  // 相同的file 
    final List<StoreFileMetaData> different = new ArrayList<>();  // 不同的file
    final List<StoreFileMetaData> missing = new ArrayList<>();   // 缺失的file
    final Map<String, List<StoreFileMetaData>> perSegment = new HashMap<>();
    final List<StoreFileMetaData> perCommitStoreFiles = new ArrayList<>();
    ... ...
    for (List<StoreFileMetaData> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
        identicalFiles.clear();
        boolean consistent = true;
        for (StoreFileMetaData meta : segmentFiles) {
            StoreFileMetaData storeFileMetaData = recoveryTargetSnapshot.get(meta.name());
            if (storeFileMetaData == null) {
                consistent = false;
                missing.add(meta); // 该segment在target node中不存在,则加入到missing
            } else if (storeFileMetaData.isSame(meta) == false) {
                consistent = false;
                different.add(meta); // 存在但不相同,则加入到different
            } else {
                identicalFiles.add(meta);  // 存在且相同
            }
        }
        if (consistent) {
            identical.addAll(identicalFiles);
        } else {
            // make sure all files are added - this can happen if only the deletes are different
            different.addAll(identicalFiles);
        }
    }
    RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical), Collections.unmodifiableList(different), Collections.unmodifiableList(missing));
    return recoveryDiff;
}

    这里将所有的segment file分为三类:identical(相同)、different(不同)、missing(target缺失)。然后将different和missing的segment files作为第一阶段需要恢复的文件发送到target node。发送完segment files后,源节点还会向目标节点发送消息以通知目标节点清理临时文件,然后也会发送消息通知目标节点打开引擎准备接收translog,这里需要注意的是,这两次网络通信都会调用 PlainTransportFuture.txGet() 方法阻塞等待 对端回复。至此,第一阶段的恢复逻辑完毕。

    第二阶段的逻辑比较简单,只需将translog view到当前时间之间的所有translog发送给源节点即可。

3.3 目标节点开始恢复

  • 接收segment

    对应上一小节源节点恢复的第一阶段,源节点将所有有差异的segment发送给目标节点,目标节点接收到后会将segment文件落盘。segment files的写入函数为RecoveryTarget.writeFileChunk:

public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk, int totalTranslogOps) throws IOException {
    final Store store = store();
    final String name = fileMetaData.name();
    ... ...
    if (position == 0) {
        indexOutput = openAndPutIndexOutput(name, fileMetaData, store);
    } else {
        indexOutput = getOpenIndexOutput(name); // 加一层前缀,组成临时文件
    }
    ... ...
    while((scratch = iterator.next()) != null) { 
        indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length); // 写临时文件
    }
    ... ...
    store.directory().sync(Collections.singleton(temporaryFileName));  // 这里会调用fsync落盘
}
  • 打开引擎

    经过上面的过程,目标节点完成了追数据的第一步。接收完segment后,目标节点打开shard对应的引擎准备接收translog,注意,这里打开引擎后,正在恢复的shard便可进行写入、删除(操作包括primary shard同步的请求和translog中的操作命令)。打开引擎的逻辑如下:

private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, long maxUnsafeAutoIdTimestamp) throws IOException {
    ... ...
    recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
    final EngineConfig.OpenMode openMode;
    if (indexExists == false) {
        openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
    } else if (skipTranslogRecovery) {
        openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
    } else {
        openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
    }
    final EngineConfig config = newEngineConfig(openMode, maxUnsafeAutoIdTimestamp);
    // we disable deletes since we allow for operations to be executed against the shard while recovering
    // but we need to make sure we don't loose deletes until we are done recovering
    config.setEnableGcDeletes(false); // 恢复过程中不删除translog
    Engine newEngine = createNewEngine(config); // 创建engine
    ... ...
}
  • 接收并重放translog

    打开引擎后,便可以根据translog中的命令进行相应的回放动作,回放的逻辑和正常的写入、删除类似,这里需要根据translog还原出操作类型和操作数据,并根据操作数据构建相应的数据对象,然后再调用上一步打开的engine执行相应的操作,这块逻辑如下:

private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException {
    switch (operation.opType()) { // 还原出操作类型及操作数据并调用engine执行相应的动作
        case INDEX:
            Translog.Index index = (Translog.Index) operation;           
            // ...  根据index构建engineIndex对象 ...
            maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
            index(engine, engineIndex); // 执行写入操作
            break;
        case DELETE:
            Translog.Delete delete = (Translog.Delete) operation;
            // ...  根据delete构建engineDelete对象 ...
            delete(engine, engineDelete); // 执行删除操作
            break;
        default:
            throw new IllegalStateException("No operation defined for [" + operation + "]");
    }
}

    通过上面的步骤,translog的重放完毕,此后需要做一些收尾的工作,包括,refresh让回放后的最新数据可见,打开translog gc:

public void finalizeRecovery() {
    recoveryState().setStage(RecoveryState.Stage.FINALIZE);
    Engine engine = getEngine();
    engine.refresh("recovery_finalization"); 
    engine.config().setEnableGcDeletes(true);
}

    到这里,replica shard恢复的两个阶段便完成了,由于此时shard还处于INITIALIZING状态,还需通知master节点启动已恢复的shard:

private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
    @Override
    public void onRecoveryDone(RecoveryState state) {
        if (state.getRecoverySource().getType() == Type.SNAPSHOT) {
            SnapshotRecoverySource snapshotRecoverySource = (SnapshotRecoverySource) state.getRecoverySource();
            restoreService.indexShardRestoreCompleted(snapshotRecoverySource.snapshot(), shardRouting.shardId());
        }
        shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
    }
}

    至此,shard recovery的所有流程都已完成。

4. 答疑解惑

    通过上述源码剖析后,本节将对文章开头抛出的几个问题进行答疑解惑,加深大家对分片恢复的理解。

  • 分布式死锁     通过上述源码的分析,大家注意3.1和3.2末尾处加粗的地方,可以看出,源节点和目标节点都有调用PlainTransportFuture.txGet()方法阻塞线程同步返回结果,这是导致死锁的关键点。具体问题描述及处理方法见https://cloud.tencent.com/developer/article/1370318,大家可以结合本文源码分析搞清楚死锁的原因。
  • 完整性     首先,phase1阶段,保证了存量的历史数据可以恢复到从分片。phase1阶段完成后,从分片引擎打开,可以正常处理index、delete请求,而translog覆盖完了整个phase1阶段,因此在phase1阶段中的index/delete操作都将被记录下来,在phase2阶段进行translog回放时,副本分片正常的index和delete操作和translog是并行执行的,这就保证了恢复开始之前的数据、恢复中的数据都会完整的写入到副本分片,保证了数据的完整性。如下图所示:
从分片恢复时序图
  • 一致性

    由于phase1阶段完成后,从分片便可正常处理写入操作,而此时从分片的写入和phase2阶段的translog回放时并行执行的,如果translog的回放慢于正常的写入操作,那么可能会导致老的数据后写入,造成数据不一致。ES为了保证数据的一致性在进行写入操作时,会比较当前写入的版本和lucene文档版本号,如果当前版本更小,说明是旧数据则不会将文档写入lucene。相关代码如下:

final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
    plan = IndexingStrategy.skipAsStale(false, index.version());
} 

5. 小结

    本文结合ES源码详细分析了副本分片恢复的具体流程,并通过对源码的理解对文章开头提出的问题进行答疑解惑。后面,我们也将推出更多ES相关的文章,欢迎大家多多关注和交流。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏智能算法

Python学习(九)---- python中的线程

原文地址: https://blog.csdn.net/fgf00/article/details/52773459 编辑:智能算法,欢迎关注! 上期我们一起学...

18220
来自专栏程序员八阿哥

王老板Python面试(9):整理的最全 python常见面试题(基本必考)

1)迭代器是一个更抽象的概念,任何对象,如果它的类有next方法和iter方法返回自己本身。对于string、list、dict、tuple等这类容器对象,使用...

23610
来自专栏向治洪

Android热插拔事件处理详解

一、Android热插拔事件处理流程图 Android热插拔事件处理流程如下图所示: ? 二、组成 1. NetlinkManager:       ...

87570
来自专栏闵开慧

tomcat6.0下找不到jasper-runtime.jar

今天有点需求,需要用jasper-runtime.jar包。但是我在我的\apache-tomcat-6.0.16\lib目录下,怎么也找不到这个jar包。结果...

36350
来自专栏编码小白

ofbiz初级教程

本教程是ofbiz 基本应用,它涵盖了OFBiz应用程序开发过程的基本原理。目标是使开发人员熟悉最佳实践,编码惯例,基本控制流程以及开发人员对OFBiz定制所需...

1.9K30
来自专栏草根专栏

用ASP.NET Core 2.0 建立规范的 REST API -- 预备知识 + 项目准备

REST 是 Representational State Transfer 的缩写. 它是一种架构的风格, 这种风格基于一套预定义的规则, 这些规则描述了网络...

1.2K60
来自专栏步履前行

Spring Retry

  在我们的业务场景中,经常要调用其他的API来获取信息,比如我们的业务场景需要依赖个人信息来处理,这个时候调用个人信息服务的API,但是由于可能同一时段多方在...

45930
来自专栏SDNLAB

OpenDaylight Lithium-SR2 Cluster集群搭建

目的 希望大家能够通过本教程对OpenDaylight集群的基本概念如shard/基本配置有所了解,感受OpenDaylight的High Availabili...

37150
来自专栏java思维导图

开源项目renren-fast解读,让java不再难懂(二)

1、百度百科的解释: XSS又叫CSS (Cross Site Script) ,跨站脚本攻击。它指的是恶意攻击者往Web页面里插入恶意html代码,当用户浏览...

29940
来自专栏Java帮帮-微信公众号-技术文章全总结

Web-第三十一天 WebService学习【悟空教程】

简单的网络应用使用单一语言写成,它的唯一外部程序就是它所依赖的数据库。大家想想是不是这样呢?

24240

扫码关注云+社区

领取腾讯云代金券