前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ES系列(五):获取单条数据get处理过程实现

ES系列(五):获取单条数据get处理过程实现

作者头像
烂猪皮
发布2021-06-10 14:28:07
1.1K0
发布2021-06-10 14:28:07
举报
文章被收录于专栏:JAVA烂猪皮JAVA烂猪皮

前面讲的都是些比较大的东西,即框架层面的东西。今天咱们来个轻松点的,只讲一个点:如题,get单条记录的es查询实现。

1:get语义说明

get是用于搜索单条es的数据,是根据主键id查询数据方式。类比关系型数据库中的sql则相当于:

代码语言:javascript
复制
select * from test where id = #{id};

当然了,es中每个关键词,都有相当多的附加描述词汇。比如:指定输出字段,版本号。。。

2. get的实现简要说明

从语义上讲,get的结果至多只有一条记录。所以,虽然es是集群存储数据的,但此处都需要从某节点取得一条数据即可。所以,理论上,只要能够快速定位到数据在哪个es节点上,然后向其发起请求,即可获取到结果了。

另外,对于使用主键id来进行查询数据,只要数据结构设计得当,应该会有非常高效的查询能力。

所以,通过本功能的实现方式分析,我们可以简要理解es key的分布方式。

3. get的具体实现

get只是es语法中的一个小点,根据上一节我们分析,知道了如何可以找到处理get的处理器。此处,我们简单再回顾下:

代码语言:javascript
复制
// org.elasticsearch.rest.RestController#dispatchRequest
    @Override
    public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
        try {
            // 尝试所有可能的处理器
            tryAllHandlers(request, channel, threadContext);
        } catch (Exception e) {
            try {
                // 发生异常则响应异常信息
                channel.sendResponse(new BytesRestResponse(channel, e));
            } catch (Exception inner) {
                inner.addSuppressed(e);
                logger.error(() ->
                    new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
            }
        }
    }
    // org.elasticsearch.rest.RestController#tryAllHandlers
    private void tryAllHandlers(final RestRequest request, final RestChannel channel, final ThreadContext threadContext) throws Exception {
        // 读取 header 信息
        for (final RestHeaderDefinition restHeader : headersToCopy) {
            final String name = restHeader.getName();
            final List<String> headerValues = request.getAllHeaderValues(name);
            if (headerValues != null && headerValues.isEmpty() == false) {
                final List<String> distinctHeaderValues = headerValues.stream().distinct().collect(Collectors.toList());
                if (restHeader.isMultiValueAllowed() == false && distinctHeaderValues.size() > 1) {
                    channel.sendResponse(
                        BytesRestResponse.
                            createSimpleErrorResponse(channel, BAD_REQUEST, "multiple values for single-valued header [" + name + "]."));
                    return;
                } else {
                    threadContext.putHeader(name, String.join(",", distinctHeaderValues));
                }
            }
        }
        // error_trace cannot be used when we disable detailed errors
        // we consume the error_trace parameter first to ensure that it is always consumed
        if (request.paramAsBoolean("error_trace", false) && channel.detailedErrorsEnabled() == false) {
            channel.sendResponse(
                    BytesRestResponse.createSimpleErrorResponse(channel, BAD_REQUEST, "error traces in responses are disabled."));
            return;
        }

        final String rawPath = request.rawPath();
        final String uri = request.uri();
        final RestRequest.Method requestMethod;
        try {
            // Resolves the HTTP method and fails if the method is invalid
            requestMethod = request.method();
            // Loop through all possible handlers, attempting to dispatch the request
            // 获取可能的处理器,主要是有正则或者索引变量的存在,可能匹配多个处理器
            Iterator<MethodHandlers> allHandlers = getAllHandlers(request.params(), rawPath);
            while (allHandlers.hasNext()) {
                final RestHandler handler;
                // 一个处理器里支持多种请求方法
                final MethodHandlers handlers = allHandlers.next();
                if (handlers == null) {
                    handler = null;
                } else {
                    handler = handlers.getHandler(requestMethod);
                }
                if (handler == null) {
                  // 未找到处理器不代表不能处理,有可能需要继续查找,如果确定不能处理,则直接响应客户端返回
                  if (handleNoHandlerFound(rawPath, requestMethod, uri, channel)) {
                      return;
                  }
                } else {
                    // 找到了处理器,调用其方法
                    dispatchRequest(request, channel, handler);
                    return;
                }
            }
        } catch (final IllegalArgumentException e) {
            handleUnsupportedHttpMethod(uri, null, channel, getValidHandlerMethodSet(rawPath), e);
            return;
        }
        // If request has not been handled, fallback to a bad request error.
        // 降级方法调用
        handleBadRequest(uri, requestMethod, channel);
    }
        // org.elasticsearch.rest.RestController#dispatchRequest
    private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {
        final int contentLength = request.contentLength();
        if (contentLength > 0) {
            final XContentType xContentType = request.getXContentType();
            if (xContentType == null) {
                sendContentTypeErrorMessage(request.getAllHeaderValues("Content-Type"), channel);
                return;
            }
            if (handler.supportsContentStream() && xContentType != XContentType.JSON && xContentType != XContentType.SMILE) {
                channel.sendResponse(BytesRestResponse.createSimpleErrorResponse(channel, RestStatus.NOT_ACCEPTABLE,
                    "Content-Type [" + xContentType + "] does not support stream parsing. Use JSON or SMILE instead"));
                return;
            }
        }
        RestChannel responseChannel = channel;
        try {
            // 熔断判定
            if (handler.canTripCircuitBreaker()) {
                inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
            } else {
                inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
            }
            // iff we could reserve bytes for the request we need to send the response also over this channel
            responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
            // TODO: Count requests double in the circuit breaker if they need copying?
            if (handler.allowsUnsafeBuffers() == false) {
                request.ensureSafeBuffers();
            }
            if (handler.allowSystemIndexAccessByDefault() == false && request.header(ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER) == null) {
                // The ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER indicates that the request is coming from an Elastic product with a plan
                // to move away from direct access to system indices, and thus deprecation warnings should not be emitted.
                // This header is intended for internal use only.
                client.threadPool().getThreadContext().putHeader(SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY, Boolean.FALSE.toString());
            }
            // 调用handler处理方法,该handler可能会被过滤器先执行
            handler.handleRequest(request, responseChannel, client);
        } catch (Exception e) {
            responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
        }
    }
    // org.elasticsearch.xpack.security.rest.SecurityRestFilter#handleRequest
    @Override
    public void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        if (licenseState.isSecurityEnabled() && request.method() != Method.OPTIONS) {
            // CORS - allow for preflight unauthenticated OPTIONS request
            if (extractClientCertificate) {
                HttpChannel httpChannel = request.getHttpChannel();
                SSLEngineUtils.extractClientCertificates(logger, threadContext, httpChannel);
            }

            final String requestUri = request.uri();
            authenticationService.authenticate(maybeWrapRestRequest(request), ActionListener.wrap(
                authentication -> {
                    if (authentication == null) {
                        logger.trace("No authentication available for REST request [{}]", requestUri);
                    } else {
                        logger.trace("Authenticated REST request [{}] as {}", requestUri, authentication);
                    }
                    secondaryAuthenticator.authenticateAndAttachToContext(request, ActionListener.wrap(
                        secondaryAuthentication -> {
                            if (secondaryAuthentication != null) {
                                logger.trace("Found secondary authentication {} in REST request [{}]", secondaryAuthentication, requestUri);
                            }
                            RemoteHostHeader.process(request, threadContext);
                            restHandler.handleRequest(request, channel, client);
                        },
                        e -> handleException("Secondary authentication", request, channel, e)));
                }, e -> handleException("Authentication", request, channel, e)));
        } else {
            // 转发到下一处理器责任链
            restHandler.handleRequest(request, channel, client);
        }
    }

一般地,很多具体的处理器都会继承 BaseRestHandler, 即 handleRequest() 整体调用时序图如下:

handleRequest() 整体调用时序图

具体代码实现如下:

代码语言:javascript
复制
// org.elasticsearch.rest.BaseRestHandler#handleRequest
    @Override
    public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        // prepare the request for execution; has the side effect of touching the request parameters
        // 看起来叫准备请求,实际非常重要,它会组装后续的请求逻辑
        final RestChannelConsumer action = prepareRequest(request, client);

        // validate unconsumed params, but we must exclude params used to format the response
        // use a sorted set so the unconsumed parameters appear in a reliable sorted order
        final SortedSet<String> unconsumedParams =
            request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

        // validate the non-response params
        if (!unconsumedParams.isEmpty()) {
            final Set<String> candidateParams = new HashSet<>();
            candidateParams.addAll(request.consumedParams());
            candidateParams.addAll(responseParams());
            throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
        }

        if (request.hasContent() && request.isContentConsumed() == false) {
            throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
        }

        usageCount.increment();
        // execute the action
        // 即此处仅为调用前面设置好的方法
        action.accept(channel);
    }

而处理get的处理器,我们实际上可以通过在最初注册的时候,可以看到是 RestGetAction, 它实现的 prepareRequest() 体现了其处理方法。

代码语言:javascript
复制
// org.elasticsearch.rest.action.document.RestGetAction#prepareRequest
    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        GetRequest getRequest;
        if (request.hasParam("type")) {
            deprecationLogger.deprecate("get_with_types", TYPES_DEPRECATION_MESSAGE);
            getRequest = new GetRequest(request.param("index"), request.param("type"), request.param("id"));
        } else {
            getRequest = new GetRequest(request.param("index"), request.param("id"));
        }

        getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh()));
        getRequest.routing(request.param("routing"));
        getRequest.preference(request.param("preference"));
        getRequest.realtime(request.paramAsBoolean("realtime", getRequest.realtime()));
        if (request.param("fields") != null) {
            throw new IllegalArgumentException("the parameter [fields] is no longer supported, " +
                "please use [stored_fields] to retrieve stored fields or [_source] to load the field from _source");
        }
        final String fieldsParam = request.param("stored_fields");
        if (fieldsParam != null) {
            final String[] fields = Strings.splitStringByCommaToArray(fieldsParam);
            if (fields != null) {
                getRequest.storedFields(fields);
            }
        }

        getRequest.version(RestActions.parseVersion(request));
        getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType()));

        getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request));
        // 封装具体业务处理方法
        // 交由 NodeClient 处理
        return channel -> client.get(getRequest, new RestToXContentListener<GetResponse>(channel) {
            @Override
            protected RestStatus getStatus(final GetResponse response) {
                return response.isExists() ? OK : NOT_FOUND;
            }
        });
    }
    // org.elasticsearch.search.fetch.subphase.FetchSourceContext#parseFromRestRequest
    public static FetchSourceContext parseFromRestRequest(RestRequest request) {
        Boolean fetchSource = null;
        String[] sourceExcludes = null;
        String[] sourceIncludes = null;

        String source = request.param("_source");
        if (source != null) {
            if (Booleans.isTrue(source)) {
                fetchSource = true;
            } else if (Booleans.isFalse(source)) {
                fetchSource = false;
            } else {
                sourceIncludes = Strings.splitStringByCommaToArray(source);
            }
        }

        String sIncludes = request.param("_source_includes");
        if (sIncludes != null) {
            sourceIncludes = Strings.splitStringByCommaToArray(sIncludes);
        }

        String sExcludes = request.param("_source_excludes");
        if (sExcludes != null) {
            sourceExcludes = Strings.splitStringByCommaToArray(sExcludes);
        }

        if (fetchSource != null || sourceIncludes != null || sourceExcludes != null) {
            return new FetchSourceContext(fetchSource == null ? true : fetchSource, sourceIncludes, sourceExcludes);
        }
        return null;
    }

上面就是get的处理实现前奏,可以看出其支持的参数:type, id, refresh, routing, preference, fields, stored_fields, version_type, _source, _source_includes... 果然还是有点复杂,选择越多,越麻烦。

其中,url的请求方式,我们可以从es的route信息中看到:

代码语言:javascript
复制
// org.elasticsearch.rest.action.document.RestGetAction#routes
    @Override
    public List<Route> routes() {
        return unmodifiableList(asList(
            new Route(GET, "/{index}/_doc/{id}"),
            new Route(HEAD, "/{index}/_doc/{id}"),
            // Deprecated typed endpoints.
            new Route(GET, "/{index}/{type}/{id}"),
            new Route(HEAD, "/{index}/{type}/{id}")));
    }

最终,会得到一个 TransportGetAction 的内部处理器。

代码语言:javascript
复制
// org.elasticsearch.action.support.TransportAction#execute
    /**
     * Use this method when the transport action call should result in creation of a new task associated with the call.
     *
     * This is a typical behavior.
     */
    public final Task execute(Request request, ActionListener<Response> listener) {
        /*
         * While this version of execute could delegate to the TaskListener
         * version of execute that'd add yet another layer of wrapping on the
         * listener and prevent us from using the listener bare if there isn't a
         * task. That just seems like too many objects. Thus the two versions of
         * this method.
         */
        final Releasable unregisterChildNode = registerChildNode(request.getParentTask());
        final Task task;
        try {
            task = taskManager.register("transport", actionName, request);
        } catch (TaskCancelledException e) {
            unregisterChildNode.close();
            throw e;
        }
        execute(task, request, new ActionListener<Response>() {
            @Override
            public void onResponse(Response response) {
                try {
                    Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
                } finally {
                    listener.onResponse(response);
                }
            }

            @Override
            public void onFailure(Exception e) {
                try {
                    Releasables.close(unregisterChildNode, () -> taskManager.unregister(task));
                } finally {
                    listener.onFailure(e);
                }
            }
        });
        return task;
    }

    /**
     * Use this method when the transport action should continue to run in the context of the current task
     */
    public final void execute(Task task, Request request, ActionListener<Response> listener) {
        ActionRequestValidationException validationException = request.validate();
        if (validationException != null) {
            listener.onFailure(validationException);
            return;
        }

        if (task != null && request.getShouldStoreResult()) {
            listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
        }

        RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);
        requestFilterChain.proceed(task, actionName, request, listener);
    }

TransportGetAction 继承了 TransportSingleShardAction 继承了 TransportAction . , 所以 doExecute 调用了 TransportSingleShardAction 的方法。

代码语言:javascript
复制
// org.elasticsearch.action.support.single.shard.TransportSingleShardAction#doExecute
    @Override
    protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
        new AsyncSingleAction(request, listener).start();
    }
        // org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction
        private AsyncSingleAction(Request request, ActionListener<Response> listener) {
            this.listener = listener;

            ClusterState clusterState = clusterService.state();
            if (logger.isTraceEnabled()) {
                logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
            }
            nodes = clusterState.nodes();
            ClusterBlockException blockException = checkGlobalBlock(clusterState);
            if (blockException != null) {
                throw blockException;
            }

            String concreteSingleIndex;
            // 解析 index 参数
            if (resolveIndex(request)) {
                concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();
            } else {
                concreteSingleIndex = request.index();
            }
            // 组装内部请求
            this.internalRequest = new InternalRequest(request, concreteSingleIndex);
            resolveRequest(clusterState, internalRequest);
            // 再次检测集群阻塞状态
            blockException = checkRequestBlock(clusterState, internalRequest);
            if (blockException != null) {
                throw blockException;
            }
            // 获取所有分片信息,向集群发起请求
            this.shardIt = shards(clusterState, internalRequest);
        }
    // org.elasticsearch.cluster.metadata.IndexNameExpressionResolver#concreteSingleIndex
    /**
     * Utility method that allows to resolve an index expression to its corresponding single concrete index.
     * Callers should make sure they provide proper {@link org.elasticsearch.action.support.IndicesOptions}
     * that require a single index as a result. The indices resolution must in fact return a single index when
     * using this method, an {@link IllegalArgumentException} gets thrown otherwise.
     *
     * @param state             the cluster state containing all the data to resolve to expression to a concrete index
     * @param request           The request that defines how the an alias or an index need to be resolved to a concrete index
     *                          and the expression that can be resolved to an alias or an index name.
     * @throws IllegalArgumentException if the index resolution lead to more than one index
     * @return the concrete index obtained as a result of the index resolution
     */
    public Index concreteSingleIndex(ClusterState state, IndicesRequest request) {
        String indexExpression = CollectionUtils.isEmpty(request.indices()) ? null : request.indices()[0];
        Index[] indices = concreteIndices(state, request.indicesOptions(), indexExpression);
        if (indices.length != 1) {
            throw new IllegalArgumentException("unable to return a single index as the index and options" +
                " provided got resolved to multiple indices");
        }
        return indices[0];
    }

    // org.elasticsearch.cluster.metadata.IndexNameExpressionResolver#concreteIndices
    /**
     * Translates the provided index expression into actual concrete indices, properly deduplicated.
     *
     * @param state             the cluster state containing all the data to resolve to expressions to concrete indices
     * @param options           defines how the aliases or indices need to be resolved to concrete indices
     * @param indexExpressions  expressions that can be resolved to alias or index names.
     * @return the resolved concrete indices based on the cluster state, indices options and index expressions
     * @throws IndexNotFoundException if one of the index expressions is pointing to a missing index or alias and the
     * provided indices options in the context don't allow such a case, or if the final result of the indices resolution
     * contains no indices and the indices options in the context don't allow such a case.
     * @throws IllegalArgumentException if one of the aliases resolve to multiple indices and the provided
     * indices options in the context don't allow such a case.
     */
    public Index[] concreteIndices(ClusterState state, IndicesOptions options, String... indexExpressions) {
        return concreteIndices(state, options, false, indexExpressions);
    }

    public Index[] concreteIndices(ClusterState state, IndicesOptions options, boolean includeDataStreams, String... indexExpressions) {
        Context context = new Context(state, options, false, false, includeDataStreams,
            isSystemIndexAccessAllowed());
        return concreteIndices(context, indexExpressions);
    }

    Index[] concreteIndices(Context context, String... indexExpressions) {
        if (indexExpressions == null || indexExpressions.length == 0) {
            indexExpressions = new String[]{Metadata.ALL};
        }
        Metadata metadata = context.getState().metadata();
        IndicesOptions options = context.getOptions();
        // If only one index is specified then whether we fail a request if an index is missing depends on the allow_no_indices
        // option. At some point we should change this, because there shouldn't be a reason why whether a single index
        // or multiple indices are specified yield different behaviour.
        final boolean failNoIndices = indexExpressions.length == 1 ? !options.allowNoIndices() : !options.ignoreUnavailable();
        List<String> expressions = Arrays.asList(indexExpressions);
        for (ExpressionResolver expressionResolver : expressionResolvers) {
            expressions = expressionResolver.resolve(context, expressions);
        }

        if (expressions.isEmpty()) {
            if (!options.allowNoIndices()) {
                IndexNotFoundException infe;
                if (indexExpressions.length == 1) {
                    if (indexExpressions[0].equals(Metadata.ALL)) {
                        infe = new IndexNotFoundException("no indices exist", (String)null);
                    } else {
                        infe = new IndexNotFoundException((String)null);
                    }
                } else {
                    infe = new IndexNotFoundException((String)null);
                }
                infe.setResources("index_expression", indexExpressions);
                throw infe;
            } else {
                return Index.EMPTY_ARRAY;
            }
        }

        boolean excludedDataStreams = false;
        final Set<Index> concreteIndices = new LinkedHashSet<>(expressions.size());
        for (String expression : expressions) {
            IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(expression);
            if (indexAbstraction == null ) {
                if (failNoIndices) {
                    IndexNotFoundException infe;
                    if (expression.equals(Metadata.ALL)) {
                        infe = new IndexNotFoundException("no indices exist", expression);
                    } else {
                        infe = new IndexNotFoundException(expression);
                    }
                    infe.setResources("index_expression", expression);
                    throw infe;
                } else {
                    continue;
                }
            } else if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS && context.getOptions().ignoreAliases()) {
                if (failNoIndices) {
                    throw aliasesNotSupportedException(expression);
                } else {
                    continue;
                }
            } else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM &&
                        context.includeDataStreams() == false) {
                excludedDataStreams = true;
                continue;
            }

            if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS && context.isResolveToWriteIndex()) {
                IndexMetadata writeIndex = indexAbstraction.getWriteIndex();
                if (writeIndex == null) {
                    throw new IllegalArgumentException("no write index is defined for alias [" + indexAbstraction.getName() + "]." +
                        " The write index may be explicitly disabled using is_write_index=false or the alias points to multiple" +
                        " indices without one being designated as a write index");
                }
                if (addIndex(writeIndex, context)) {
                    concreteIndices.add(writeIndex.getIndex());
                }
            } else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM && context.isResolveToWriteIndex()) {
                IndexMetadata writeIndex = indexAbstraction.getWriteIndex();
                if (addIndex(writeIndex, context)) {
                    concreteIndices.add(writeIndex.getIndex());
                }
            } else {
                if (indexAbstraction.getIndices().size() > 1 && !options.allowAliasesToMultipleIndices()) {
                    String[] indexNames = new String[indexAbstraction.getIndices().size()];
                    int i = 0;
                    for (IndexMetadata indexMetadata : indexAbstraction.getIndices()) {
                        indexNames[i++] = indexMetadata.getIndex().getName();
                    }
                    throw new IllegalArgumentException(indexAbstraction.getType().getDisplayName() + " [" + expression +
                        "] has more than one index associated with it " + Arrays.toString(indexNames) +
                        ", can't execute a single index op");
                }

                for (IndexMetadata index : indexAbstraction.getIndices()) {
                    if (shouldTrackConcreteIndex(context, options, index)) {
                        concreteIndices.add(index.getIndex());
                    }
                }
            }
        }

        if (options.allowNoIndices() == false && concreteIndices.isEmpty()) {
            IndexNotFoundException infe = new IndexNotFoundException((String)null);
            infe.setResources("index_expression", indexExpressions);
            if (excludedDataStreams) {
                // Allows callers to handle IndexNotFoundException differently based on whether data streams were excluded.
                infe.addMetadata(EXCLUDED_DATA_STREAMS_KEY, "true");
            }
            throw infe;
        }
        checkSystemIndexAccess(context, metadata, concreteIndices, indexExpressions);
        return concreteIndices.toArray(new Index[concreteIndices.size()]);
    }

    // org.elasticsearch.action.get.TransportGetAction#resolveRequest
    @Override
    protected void resolveRequest(ClusterState state, InternalRequest request) {
        // update the routing (request#index here is possibly an alias)
        request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));
        // Fail fast on the node that received the request.
        if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {
            throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
        }
    }

    @Override
    protected ShardIterator shards(ClusterState state, InternalRequest request) {
        return clusterService.operationRouting()
                .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),
                    request.request().preference());
    }

        // org.elasticsearch.action.support.single.shard.TransportSingleShardAction.AsyncSingleAction#start
        public void start() {
            if (shardIt == null) {
                // just execute it on the local node
                final Writeable.Reader<Response> reader = getResponseReader();
                transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(),
                    new TransportResponseHandler<Response>() {
                    @Override
                    public Response read(StreamInput in) throws IOException {
                        return reader.read(in);
                    }

                    @Override
                    public void handleResponse(final Response response) {
                        listener.onResponse(response);
                    }

                    @Override
                    public void handleException(TransportException exp) {
                        listener.onFailure(exp);
                    }
                });
            } else {
                perform(null);
            }
        }
代码语言:javascript
复制
private void perform(@Nullable final Exception currentFailure) {
            Exception lastFailure = this.lastFailure;
            if (lastFailure == null || TransportActions.isReadOverrideException(currentFailure)) {
                lastFailure = currentFailure;
                this.lastFailure = currentFailure;
            }
            final ShardRouting shardRouting = shardIt.nextOrNull();
            if (shardRouting == null) {
                Exception failure = lastFailure;
                if (failure == null || isShardNotAvailableException(failure)) {
                    failure = new NoShardAvailableActionException(null,
                        LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
                } else {
                    logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null,
                        internalRequest.request()), failure);
                }
                listener.onFailure(failure);
                return;
            }
            DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
            if (node == null) {
                onFailure(shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
            } else {
                internalRequest.request().internalShardId = shardRouting.shardId();
                if (logger.isTraceEnabled()) {
                    logger.trace(
                            "sending request [{}] to shard [{}] on node [{}]",
                            internalRequest.request(),
                            internalRequest.request().internalShardId,
                            node
                    );
                }
                final Writeable.Reader<Response> reader = getResponseReader();
                transportService.sendRequest(node, transportShardAction, internalRequest.request(),
                    new TransportResponseHandler<Response>() {

                        @Override
                        public Response read(StreamInput in) throws IOException {
                            return reader.read(in);
                        }

                        @Override
                        public void handleResponse(final Response response) {
                            listener.onResponse(response);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            onFailure(shardRouting, exp);
                        }
                });
            }
        }

sendRequest, 本意是向外部网络发起请求,得到结果的过程。但es针对本节点的请求当然不会真正向外请求,我们以请求本节点的时序图,来看看es处理get的sendRequest过程吧。

sendRequest 调用时序图

具体代码实现如下:

代码语言:javascript
复制
// org.elasticsearch.transport.TransportService#sendRequest
    public <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
                                                                final TransportRequest request,
                                                                final TransportResponseHandler<T> handler) {
        sendRequest(node, action, request, TransportRequestOptions.EMPTY, handler);
    }

    public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
                                                                final TransportRequest request,
                                                                final TransportRequestOptions options,
                                                                TransportResponseHandler<T> handler) {
        final Transport.Connection connection;
        try {
            connection = getConnection(node);
        } catch (final NodeNotConnectedException ex) {
            // the caller might not handle this so we invoke the handler
            handler.handleException(ex);
            return;
        }
        sendRequest(connection, action, request, options, handler);
    }

    /**
     * Returns either a real transport connection or a local node connection if we are using the local node optimization.
     * @throws NodeNotConnectedException if the given node is not connected
     */
    public Transport.Connection getConnection(DiscoveryNode node) {
        // 如果是当前节点,直接返回当前service即可,无须再访问远程节点
        if (isLocalNode(node)) {
            return localNodeConnection;
        } else {
            return connectionManager.getConnection(node);
        }
    }
    private boolean isLocalNode(DiscoveryNode discoveryNode) {
        return Objects.requireNonNull(discoveryNode, "discovery node must not be null").equals(localNode);
    }

    // org.elasticsearch.transport.TransportService#sendRequest
    /**
     * Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
     *
     * @param connection the connection to send the request on
     * @param action     the name of the action
     * @param request    the request
     * @param options    the options for this request
     * @param handler    the response handler
     * @param <T>        the type of the transport response
     */
    public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
                                                                final TransportRequest request,
                                                                final TransportRequestOptions options,
                                                                final TransportResponseHandler<T> handler) {
        try {
            final TransportResponseHandler<T> delegate;
            if (request.getParentTask().isSet()) {
                // If the connection is a proxy connection, then we will create a cancellable proxy task on the proxy node and an actual
                // child task on the target node of the remote cluster.
                //  ----> a parent task on the local cluster
                //        |
                //         ----> a proxy task on the proxy node on the remote cluster
                //               |
                //                ----> an actual child task on the target node on the remote cluster
                // To cancel the child task on the remote cluster, we must send a cancel request to the proxy node instead of the target
                // node as the parent task of the child task is the proxy task not the parent task on the local cluster. Hence, here we
                // unwrap the connection and keep track of the connection to the proxy node instead of the proxy connection.
                final Transport.Connection unwrappedConn = unwrapConnection(connection);
                final Releasable unregisterChildNode = taskManager.registerChildConnection(request.getParentTask().getId(), unwrappedConn);
                delegate = new TransportResponseHandler<T>() {
                    @Override
                    public void handleResponse(T response) {
                        unregisterChildNode.close();
                        handler.handleResponse(response);
                    }

                    @Override
                    public void handleException(TransportException exp) {
                        unregisterChildNode.close();
                        handler.handleException(exp);
                    }

                    @Override
                    public String executor() {
                        return handler.executor();
                    }

                    @Override
                    public T read(StreamInput in) throws IOException {
                        return handler.read(in);
                    }

                    @Override
                    public String toString() {
                        return getClass().getName() + "/[" + action + "]:" + handler.toString();
                    }
                };
            } else {
                delegate = handler;
            }
            asyncSender.sendRequest(connection, action, request, options, delegate);
        } catch (final Exception ex) {
            // the caller might not handle this so we invoke the handler
            final TransportException te;
            if (ex instanceof TransportException) {
                te = (TransportException) ex;
            } else {
                te = new TransportException("failure to send", ex);
            }
            handler.handleException(te);
        }
    }

    // org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor#interceptSender
    @Override
    public AsyncSender interceptSender(AsyncSender sender) {
        return new AsyncSender() {
            @Override
            public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
                                                                  TransportRequestOptions options, TransportResponseHandler<T> handler) {
                final boolean requireAuth = shouldRequireExistingAuthentication();
                // the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
                // ourselves otherwise we wind up using a version newer than what we can actually send
                final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);

                // Sometimes a system action gets executed like a internal create index request or update mappings request
                // which means that the user is copied over to system actions so we need to change the user
                if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
                    securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,
                            new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
                                    , handler), sender, requireAuth), minVersion);
                } else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
                    AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext,
                            (original) -> sendWithUser(connection, action, request, options,
                                    new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
                                            , handler), sender, requireAuth));
                } else if (securityContext.getAuthentication() != null &&
                        securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
                    // re-write the authentication since we want the authentication version to match the version of the connection
                    securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options,
                        new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender,
                        requireAuth), minVersion);
                } else {
                    sendWithUser(connection, action, request, options, handler, sender, requireAuth);
                }
            }
        };
    }
    // org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor#sendWithUser
    private <T extends TransportResponse> void sendWithUser(Transport.Connection connection, String action, TransportRequest request,
                                                            TransportRequestOptions options, TransportResponseHandler<T> handler,
                                                            AsyncSender sender, final boolean requireAuthentication) {
        if (securityContext.getAuthentication() == null && requireAuthentication) {
            // we use an assertion here to ensure we catch this in our testing infrastructure, but leave the ISE for cases we do not catch
            // in tests and may be hit by a user
            assertNoAuthentication(action);
            throw new IllegalStateException("there should always be a user when sending a message for action [" + action + "]");
        }

        try {
            sender.sendRequest(connection, action, request, options, handler);
        } catch (Exception e) {
            handler.handleException(new TransportException("failed sending request", e));
        }
    }
    // org.elasticsearch.transport.TransportService#sendRequestInternal
    private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
                                                                   final TransportRequest request,
                                                                   final TransportRequestOptions options,
                                                                   TransportResponseHandler<T> handler) {
        if (connection == null) {
            throw new IllegalStateException("can't send request to a null connection");
        }
        DiscoveryNode node = connection.getNode();

        Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
        ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
        // TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
        final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
        final TimeoutHandler timeoutHandler;
        if (options.timeout() != null) {
            timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
            responseHandler.setTimeoutHandler(timeoutHandler);
        } else {
            timeoutHandler = null;
        }
        try {
            if (lifecycle.stoppedOrClosed()) {
                /*
                 * If we are not started the exception handling will remove the request holder again and calls the handler to notify the
                 * caller. It will only notify if toStop hasn't done the work yet.
                 */
                throw new NodeClosedException(localNode);
            }
            if (timeoutHandler != null) {
                assert options.timeout() != null;
                timeoutHandler.scheduleTimeout(options.timeout());
            }
            // 请求发送
            connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
        } catch (final Exception e) {
            // usually happen either because we failed to connect to the node
            // or because we failed serializing the message
            final Transport.ResponseContext<? extends TransportResponse> contextToNotify = responseHandlers.remove(requestId);
            // If holderToNotify == null then handler has already been taken care of.
            if (contextToNotify != null) {
                if (timeoutHandler != null) {
                    timeoutHandler.cancel();
                }
                // callback that an exception happened, but on a different thread since we don't
                // want handlers to worry about stack overflows. In the special case of running into a closing node we run on the current
                // thread on a best effort basis though.
                final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
                final String executor = lifecycle.stoppedOrClosed() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
                threadPool.executor(executor).execute(new AbstractRunnable() {
                    @Override
                    public void onRejection(Exception e) {
                        // if we get rejected during node shutdown we don't wanna bubble it up
                        logger.debug(
                            () -> new ParameterizedMessage(
                                "failed to notify response handler on rejection, action: {}",
                                contextToNotify.action()),
                            e);
                    }
                    @Override
                    public void onFailure(Exception e) {
                        logger.warn(
                            () -> new ParameterizedMessage(
                                "failed to notify response handler on exception, action: {}",
                                contextToNotify.action()),
                            e);
                    }
                    @Override
                    protected void doRun() throws Exception {
                        contextToNotify.handler().handleException(sendRequestException);
                    }
                });
            } else {
                logger.debug("Exception while sending request, handler likely already notified due to timeout", e);
            }
        }
    }
        // org.elasticsearch.transport.Transport.Connection#sendRequest
        @Override
        public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
            throws TransportException {
            sendLocalRequest(requestId, action, request, options);
        }
    // org.elasticsearch.transport.TransportService#sendLocalRequest
    private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
        final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);
        try {
            onRequestSent(localNode, requestId, action, request, options);
            onRequestReceived(requestId, action);
            // 获取 handler
            final RequestHandlerRegistry reg = getRequestHandler(action);
            if (reg == null) {
                throw new ActionNotFoundTransportException("Action [" + action + "] not found");
            }
            final String executor = reg.getExecutor();
            if (ThreadPool.Names.SAME.equals(executor)) {
                //noinspection unchecked
                reg.processMessageReceived(request, channel);
            } else {
                threadPool.executor(executor).execute(new AbstractRunnable() {
                    @Override
                    protected void doRun() throws Exception {
                        //noinspection unchecked
                        reg.processMessageReceived(request, channel);
                    }

                    @Override
                    public boolean isForceExecution() {
                        return reg.isForceExecution();
                    }

                    @Override
                    public void onFailure(Exception e) {
                        try {
                            channel.sendResponse(e);
                        } catch (Exception inner) {
                            inner.addSuppressed(e);
                            logger.warn(() -> new ParameterizedMessage(
                                    "failed to notify channel of error message for action [{}]", action), inner);
                        }
                    }

                    @Override
                    public String toString() {
                        return "processing of [" + requestId + "][" + action + "]: " + request;
                    }
                });
            }

        } catch (Exception e) {
            try {
                channel.sendResponse(e);
            } catch (Exception inner) {
                inner.addSuppressed(e);
                logger.warn(
                    () -> new ParameterizedMessage(
                        "failed to notify channel of error message for action [{}]", action), inner);
            }
        }
    }
    // org.elasticsearch.transport.RequestHandlerRegistry#processMessageReceived
    public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
        final Task task = taskManager.register(channel.getChannelType(), action, request);
        Releasable unregisterTask = () -> taskManager.unregister(task);
        try {
            if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) {
                final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel();
                final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task);
                unregisterTask = Releasables.wrap(unregisterTask, stopTracking);
            }
            final TaskTransportChannel taskTransportChannel = new TaskTransportChannel(channel, unregisterTask);
            handler.messageReceived(request, taskTransportChannel, task);
            unregisterTask = null;
        } finally {
            Releasables.close(unregisterTask);
        }
    }
    // org.elasticsearch.action.get.TransportGetAction#asyncShardOperation
    @Override
    protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());
        if (request.realtime()) { // we are not tied to a refresh cycle here anyway
            super.asyncShardOperation(request, shardId, listener);
        } else {
            indexShard.awaitShardSearchActive(b -> {
                try {
                    super.asyncShardOperation(request, shardId, listener);
                } catch (Exception ex) {
                    listener.onFailure(ex);
                }
            });
        }
    }
    // org.elasticsearch.action.support.single.shard.TransportSingleShardAction#asyncShardOperation
    protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
        threadPool.executor(getExecutor(request, shardId))
            .execute(ActionRunnable.supply(listener, () -> shardOperation(request, shardId)));
    }

    // org.elasticsearch.action.get.TransportGetAction#shardOperation
    // 该方法为实现get操作的真正方法
    @Override
    protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());

        if (request.refresh() && !request.realtime()) {
            indexShard.refresh("refresh_flag_get");
        }
        // 核心 get 实现
        GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
                request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
        // 使用 GetResponse 包装结果返回,方便结果响应输出
        return new GetResponse(result);
    }

以上,是 get 的处理框架,基本原理就是先看是否有路由配置,如果有按其规则来,如果没有直接向某个节点发起请求,即可获取到数据。整个get请求前置的转发过程,因为考虑大量的远程调用与复用,显示比较复杂。其中,es中大量使用了包装器模式和观察者模式,值得我们注意和学习。到最后最核心的get的实现,其实是 shardOperation() 方法。它又是如何与lucene交互获取数据的呢?稍后见分晓。

路由是es的一个我特性,而shard则是一个核心概念。接下来,我们看两个细节,即 es get 如何处理路由 和es get 如何处理shard问题。

代码语言:javascript
复制
// 处理路由问题, 请求方式如: curl -XGET 'http://localhost:9200/test_index/job/1?routing=user123'
    // org.elasticsearch.action.get.TransportGetAction#resolveRequest
    @Override
    protected void resolveRequest(ClusterState state, InternalRequest request) {
        // update the routing (request#index here is possibly an alias)
        // 取出默认的 routing 规则,或者校验传入的routing规则
        request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));
        // Fail fast on the node that received the request.
        if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {
            throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
        }
    }
    // org.elasticsearch.cluster.metadata.Metadata#resolveIndexRouting
    /**
     * Returns indexing routing for the given index.
     */
    // TODO: This can be moved to IndexNameExpressionResolver too, but this means that we will support wildcards and other expressions
    // in the index,bulk,update and delete apis.
    public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex) {
        if (aliasOrIndex == null) {
            return routing;
        }
        // 获取索引对应的元数据信息,用于判定是否是别名型索引
        // 非别名型索引,routing 不变
        IndexAbstraction result = getIndicesLookup().get(aliasOrIndex);
        if (result == null || result.getType() != IndexAbstraction.Type.ALIAS) {
            return routing;
        }
        IndexAbstraction.Alias alias = (IndexAbstraction.Alias) result;
        if (result.getIndices().size() > 1) {
            rejectSingleIndexOperation(aliasOrIndex, result);
        }
        AliasMetadata aliasMd = alias.getFirstAliasMetadata();
        if (aliasMd.indexRouting() != null) {
            // 一个 alias 不允许有多个 routing key
            if (aliasMd.indexRouting().indexOf(',') != -1) {
                throw new IllegalArgumentException("index/alias [" + aliasOrIndex + "] provided with routing value [" +
                    aliasMd.getIndexRouting() + "] that resolved to several routing values, rejecting operation");
            }
            if (routing != null) {
                if (!routing.equals(aliasMd.indexRouting())) {
                    throw new IllegalArgumentException("Alias [" + aliasOrIndex + "] has index routing associated with it [" +
                        aliasMd.indexRouting() + "], and was provided with routing value [" + routing + "], rejecting operation");
                }
            }
            // Alias routing overrides the parent routing (if any).
            return aliasMd.indexRouting();
        }
        return routing;
    }

    // 处理shard问题
    // org.elasticsearch.action.get.TransportGetAction#shards
    @Override
    protected ShardIterator shards(ClusterState state, InternalRequest request) {
        return clusterService.operationRouting()
                .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),
                    request.request().preference());
    }
    // org.elasticsearch.cluster.routing.OperationRouting#getShards
    public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing,
                                   @Nullable String preference) {
        // 先获取shardingTable, 再生成Shard迭代器
        return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(),
            clusterState.nodes(), preference, null, null);
    }
    // org.elasticsearch.cluster.routing.OperationRouting#shards
    protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {
        int shardId = generateShardId(indexMetadata(clusterState, index), id, routing);
        return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
    }
    // org.elasticsearch.cluster.routing.OperationRouting#indexMetadata
    protected IndexMetadata indexMetadata(ClusterState clusterState, String index) {
        IndexMetadata indexMetadata = clusterState.metadata().index(index);
        if (indexMetadata == null) {
            throw new IndexNotFoundException(index);
        }
        return indexMetadata;
    }
    // 生成shardId
    // org.elasticsearch.cluster.routing.OperationRouting#generateShardId
    public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {
        final String effectiveRouting;
        final int partitionOffset;

        if (routing == null) {
            assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
            effectiveRouting = id;
        } else {
            effectiveRouting = routing;
        }

        if (indexMetadata.isRoutingPartitionedIndex()) {
            partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());
        } else {
            // we would have still got 0 above but this check just saves us an unnecessary hash calculation
            partitionOffset = 0;
        }

        return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
    }
    // org.elasticsearch.cluster.routing.OperationRouting#calculateScaledShardId
    private static int calculateScaledShardId(IndexMetadata indexMetadata, String effectiveRouting, int partitionOffset) {
        final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

        // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
        // of original index to hash documents
        // routingFactor默认为1
        return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();
    }
    // org.elasticsearch.cluster.routing.RoutingTable#shardRoutingTable(java.lang.String, int)
    /**
     * All shards for the provided index and shard id
     * @return All the shard routing entries for the given index and shard id
     * @throws IndexNotFoundException if provided index does not exist
     * @throws ShardNotFoundException if provided shard id is unknown
     */
    public IndexShardRoutingTable shardRoutingTable(String index, int shardId) {
        IndexRoutingTable indexRouting = index(index);
        if (indexRouting == null) {
            throw new IndexNotFoundException(index);
        }
        return shardRoutingTable(indexRouting, shardId);
    }
    // org.elasticsearch.cluster.routing.RoutingTable#shardRoutingTable
    /**
     * Get's the {@link IndexShardRoutingTable} for the given shard id from the given {@link IndexRoutingTable}
     * or throws a {@link ShardNotFoundException} if no shard by the given id is found in the IndexRoutingTable.
     *
     * @param indexRouting IndexRoutingTable
     * @param shardId ShardId
     * @return IndexShardRoutingTable
     */
    public static IndexShardRoutingTable shardRoutingTable(IndexRoutingTable indexRouting, int shardId) {
        IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
        if (indexShard == null) {
            throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId));
        }
        return indexShard;
    }
    // org.elasticsearch.cluster.routing.OperationRouting#preferenceActiveShardIterator
    private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId,
                                                        DiscoveryNodes nodes, @Nullable String preference,
                                                        @Nullable ResponseCollectorService collectorService,
                                                        @Nullable Map<String, Long> nodeCounts) {
        if (preference == null || preference.isEmpty()) {
            return shardRoutings(indexShard, nodes, collectorService, nodeCounts);
        }
        if (preference.charAt(0) == '_') {
            Preference preferenceType = Preference.parse(preference);
            if (preferenceType == Preference.SHARDS) {
                // starts with _shards, so execute on specific ones
                int index = preference.indexOf('|');

                String shards;
                if (index == -1) {
                    shards = preference.substring(Preference.SHARDS.type().length() + 1);
                } else {
                    shards = preference.substring(Preference.SHARDS.type().length() + 1, index);
                }
                String[] ids = Strings.splitStringByCommaToArray(shards);
                boolean found = false;
                for (String id : ids) {
                    if (Integer.parseInt(id) == indexShard.shardId().id()) {
                        found = true;
                        break;
                    }
                }
                if (!found) {
                    return null;
                }
                // no more preference
                if (index == -1 || index == preference.length() - 1) {
                    return shardRoutings(indexShard, nodes, collectorService, nodeCounts);
                } else {
                    // update the preference and continue
                    preference = preference.substring(index + 1);
                }
            }
            preferenceType = Preference.parse(preference);
            switch (preferenceType) {
                case PREFER_NODES:
                    final Set<String> nodesIds =
                            Arrays.stream(
                                    preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")
                            ).collect(Collectors.toSet());
                    return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
                case LOCAL:
                    return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
                case ONLY_LOCAL:
                    return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
                case ONLY_NODES:
                    String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1);
                    return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
                default:
                    throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
            }
        }
        // if not, then use it as the index
        int routingHash = Murmur3HashFunction.hash(preference);
        if (nodes.getMinNodeVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
            // The AllocationService lists shards in a fixed order based on nodes
            // so earlier versions of this class would have a tendency to
            // select the same node across different shardIds.
            // Better overall balancing can be achieved if each shardId opts
            // for a different element in the list by also incorporating the
            // shard ID into the hash of the user-supplied preference key.
            routingHash = 31 * routingHash + indexShard.shardId.hashCode();
        }
        if (awarenessAttributes.isEmpty()) {
            return indexShard.activeInitializingShardsIt(routingHash);
        } else {
            return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
        }
    }

    private ShardIterator shardRoutings(IndexShardRoutingTable indexShard, DiscoveryNodes nodes,
            @Nullable ResponseCollectorService collectorService, @Nullable Map<String, Long> nodeCounts) {
        if (awarenessAttributes.isEmpty()) {
            if (useAdaptiveReplicaSelection) {
                return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
            } else {
                return indexShard.activeInitializingShardsRandomIt();
            }
        } else {
            return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
        }
    }
    // org.elasticsearch.cluster.routing.IndexShardRoutingTable#activeInitializingShardsRankedIt
    /**
     * Returns an iterator over active and initializing shards, ordered by the adaptive replica
     * selection formula. Making sure though that its random within the active shards of the same
     * (or missing) rank, and initializing shards are the last to iterate through.
     */
    public ShardIterator activeInitializingShardsRankedIt(@Nullable ResponseCollectorService collector,
                                                          @Nullable Map<String, Long> nodeSearchCounts) {
        final int seed = shuffler.nextSeed();
        if (allInitializingShards.isEmpty()) {
            return new PlainShardIterator(shardId,
                    rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts));
        }

        ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
        List<ShardRouting> rankedActiveShards =
                rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts);
        ordered.addAll(rankedActiveShards);
        List<ShardRouting> rankedInitializingShards =
                rankShardsAndUpdateStats(allInitializingShards, collector, nodeSearchCounts);
        ordered.addAll(rankedInitializingShards);
        return new PlainShardIterator(shardId, ordered);
    }

以上,是 get 的处理框架,基本原理就是先看是否有路由配置,如果有按其规则来,如果没有直接向某个节点发起请求,即可获取到数据。整个get请求前置的转发过程,因为考虑大量的远程调用与复用,显示比较复杂。其中,es中大量使用了包装器模式和观察者模式,值得我们注意和学习。到最后最核心的get的实现,其实是 shardOperation() 方法。它又是如何与lucene交互获取数据的呢?稍后见分晓。  

路由是es的一个我特性,而shard则是一个核心概念。接下来,我们看两个细节,即 es get 如何处理路由 和es get 如何处理shard问题。

代码语言:javascript
复制
// 处理路由问题, 请求方式如: curl -XGET 'http://localhost:9200/test_index/job/1?routing=user123'
    // org.elasticsearch.action.get.TransportGetAction#resolveRequest
    @Override
    protected void resolveRequest(ClusterState state, InternalRequest request) {
        // update the routing (request#index here is possibly an alias)
        // 取出默认的 routing 规则,或者校验传入的routing规则
        request.request().routing(state.metadata().resolveIndexRouting(request.request().routing(), request.request().index()));
        // Fail fast on the node that received the request.
        if (request.request().routing() == null && state.getMetadata().routingRequired(request.concreteIndex())) {
            throw new RoutingMissingException(request.concreteIndex(), request.request().type(), request.request().id());
        }
    }
    // org.elasticsearch.cluster.metadata.Metadata#resolveIndexRouting
    /**
     * Returns indexing routing for the given index.
     */
    // TODO: This can be moved to IndexNameExpressionResolver too, but this means that we will support wildcards and other expressions
    // in the index,bulk,update and delete apis.
    public String resolveIndexRouting(@Nullable String routing, String aliasOrIndex) {
        if (aliasOrIndex == null) {
            return routing;
        }
        // 获取索引对应的元数据信息,用于判定是否是别名型索引
        // 非别名型索引,routing 不变
        IndexAbstraction result = getIndicesLookup().get(aliasOrIndex);
        if (result == null || result.getType() != IndexAbstraction.Type.ALIAS) {
            return routing;
        }
        IndexAbstraction.Alias alias = (IndexAbstraction.Alias) result;
        if (result.getIndices().size() > 1) {
            rejectSingleIndexOperation(aliasOrIndex, result);
        }
        AliasMetadata aliasMd = alias.getFirstAliasMetadata();
        if (aliasMd.indexRouting() != null) {
            // 一个 alias 不允许有多个 routing key
            if (aliasMd.indexRouting().indexOf(',') != -1) {
                throw new IllegalArgumentException("index/alias [" + aliasOrIndex + "] provided with routing value [" +
                    aliasMd.getIndexRouting() + "] that resolved to several routing values, rejecting operation");
            }
            if (routing != null) {
                if (!routing.equals(aliasMd.indexRouting())) {
                    throw new IllegalArgumentException("Alias [" + aliasOrIndex + "] has index routing associated with it [" +
                        aliasMd.indexRouting() + "], and was provided with routing value [" + routing + "], rejecting operation");
                }
            }
            // Alias routing overrides the parent routing (if any).
            return aliasMd.indexRouting();
        }
        return routing;
    }

    // 处理shard问题
    // org.elasticsearch.action.get.TransportGetAction#shards
    @Override
    protected ShardIterator shards(ClusterState state, InternalRequest request) {
        return clusterService.operationRouting()
                .getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),
                    request.request().preference());
    }
    // org.elasticsearch.cluster.routing.OperationRouting#getShards
    public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing,
                                   @Nullable String preference) {
        // 先获取shardingTable, 再生成Shard迭代器
        return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(),
            clusterState.nodes(), preference, null, null);
    }
    // org.elasticsearch.cluster.routing.OperationRouting#shards
    protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {
        int shardId = generateShardId(indexMetadata(clusterState, index), id, routing);
        return clusterState.getRoutingTable().shardRoutingTable(index, shardId);
    }
    // org.elasticsearch.cluster.routing.OperationRouting#indexMetadata
    protected IndexMetadata indexMetadata(ClusterState clusterState, String index) {
        IndexMetadata indexMetadata = clusterState.metadata().index(index);
        if (indexMetadata == null) {
            throw new IndexNotFoundException(index);
        }
        return indexMetadata;
    }
    // 生成shardId
    // org.elasticsearch.cluster.routing.OperationRouting#generateShardId
    public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {
        final String effectiveRouting;
        final int partitionOffset;

        if (routing == null) {
            assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
            effectiveRouting = id;
        } else {
            effectiveRouting = routing;
        }

        if (indexMetadata.isRoutingPartitionedIndex()) {
            partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());
        } else {
            // we would have still got 0 above but this check just saves us an unnecessary hash calculation
            partitionOffset = 0;
        }

        return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);
    }
    // org.elasticsearch.cluster.routing.OperationRouting#calculateScaledShardId
    private static int calculateScaledShardId(IndexMetadata indexMetadata, String effectiveRouting, int partitionOffset) {
        final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

        // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
        // of original index to hash documents
        // routingFactor默认为1
        return Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();
    }
    // org.elasticsearch.cluster.routing.RoutingTable#shardRoutingTable(java.lang.String, int)
    /**
     * All shards for the provided index and shard id
     * @return All the shard routing entries for the given index and shard id
     * @throws IndexNotFoundException if provided index does not exist
     * @throws ShardNotFoundException if provided shard id is unknown
     */
    public IndexShardRoutingTable shardRoutingTable(String index, int shardId) {
        IndexRoutingTable indexRouting = index(index);
        if (indexRouting == null) {
            throw new IndexNotFoundException(index);
        }
        return shardRoutingTable(indexRouting, shardId);
    }
    // org.elasticsearch.cluster.routing.RoutingTable#shardRoutingTable
    /**
     * Get's the {@link IndexShardRoutingTable} for the given shard id from the given {@link IndexRoutingTable}
     * or throws a {@link ShardNotFoundException} if no shard by the given id is found in the IndexRoutingTable.
     *
     * @param indexRouting IndexRoutingTable
     * @param shardId ShardId
     * @return IndexShardRoutingTable
     */
    public static IndexShardRoutingTable shardRoutingTable(IndexRoutingTable indexRouting, int shardId) {
        IndexShardRoutingTable indexShard = indexRouting.shard(shardId);
        if (indexShard == null) {
            throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId));
        }
        return indexShard;
    }
    // org.elasticsearch.cluster.routing.OperationRouting#preferenceActiveShardIterator
    private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String localNodeId,
                                                        DiscoveryNodes nodes, @Nullable String preference,
                                                        @Nullable ResponseCollectorService collectorService,
                                                        @Nullable Map<String, Long> nodeCounts) {
        if (preference == null || preference.isEmpty()) {
            return shardRoutings(indexShard, nodes, collectorService, nodeCounts);
        }
        if (preference.charAt(0) == '_') {
            Preference preferenceType = Preference.parse(preference);
            if (preferenceType == Preference.SHARDS) {
                // starts with _shards, so execute on specific ones
                int index = preference.indexOf('|');

                String shards;
                if (index == -1) {
                    shards = preference.substring(Preference.SHARDS.type().length() + 1);
                } else {
                    shards = preference.substring(Preference.SHARDS.type().length() + 1, index);
                }
                String[] ids = Strings.splitStringByCommaToArray(shards);
                boolean found = false;
                for (String id : ids) {
                    if (Integer.parseInt(id) == indexShard.shardId().id()) {
                        found = true;
                        break;
                    }
                }
                if (!found) {
                    return null;
                }
                // no more preference
                if (index == -1 || index == preference.length() - 1) {
                    return shardRoutings(indexShard, nodes, collectorService, nodeCounts);
                } else {
                    // update the preference and continue
                    preference = preference.substring(index + 1);
                }
            }
            preferenceType = Preference.parse(preference);
            switch (preferenceType) {
                case PREFER_NODES:
                    final Set<String> nodesIds =
                            Arrays.stream(
                                    preference.substring(Preference.PREFER_NODES.type().length() + 1).split(",")
                            ).collect(Collectors.toSet());
                    return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
                case LOCAL:
                    return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
                case ONLY_LOCAL:
                    return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
                case ONLY_NODES:
                    String nodeAttributes = preference.substring(Preference.ONLY_NODES.type().length() + 1);
                    return indexShard.onlyNodeSelectorActiveInitializingShardsIt(nodeAttributes.split(","), nodes);
                default:
                    throw new IllegalArgumentException("unknown preference [" + preferenceType + "]");
            }
        }
        // if not, then use it as the index
        int routingHash = Murmur3HashFunction.hash(preference);
        if (nodes.getMinNodeVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
            // The AllocationService lists shards in a fixed order based on nodes
            // so earlier versions of this class would have a tendency to
            // select the same node across different shardIds.
            // Better overall balancing can be achieved if each shardId opts
            // for a different element in the list by also incorporating the
            // shard ID into the hash of the user-supplied preference key.
            routingHash = 31 * routingHash + indexShard.shardId.hashCode();
        }
        if (awarenessAttributes.isEmpty()) {
            return indexShard.activeInitializingShardsIt(routingHash);
        } else {
            return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, routingHash);
        }
    }

    private ShardIterator shardRoutings(IndexShardRoutingTable indexShard, DiscoveryNodes nodes,
            @Nullable ResponseCollectorService collectorService, @Nullable Map<String, Long> nodeCounts) {
        if (awarenessAttributes.isEmpty()) {
            if (useAdaptiveReplicaSelection) {
                return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts);
            } else {
                return indexShard.activeInitializingShardsRandomIt();
            }
        } else {
            return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes);
        }
    }
    // org.elasticsearch.cluster.routing.IndexShardRoutingTable#activeInitializingShardsRankedIt
    /**
     * Returns an iterator over active and initializing shards, ordered by the adaptive replica
     * selection formula. Making sure though that its random within the active shards of the same
     * (or missing) rank, and initializing shards are the last to iterate through.
     */
    public ShardIterator activeInitializingShardsRankedIt(@Nullable ResponseCollectorService collector,
                                                          @Nullable Map<String, Long> nodeSearchCounts) {
        final int seed = shuffler.nextSeed();
        if (allInitializingShards.isEmpty()) {
            return new PlainShardIterator(shardId,
                    rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts));
        }

        ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
        List<ShardRouting> rankedActiveShards =
                rankShardsAndUpdateStats(shuffler.shuffle(activeShards, seed), collector, nodeSearchCounts);
        ordered.addAll(rankedActiveShards);
        List<ShardRouting> rankedInitializingShards =
                rankShardsAndUpdateStats(allInitializingShards, collector, nodeSearchCounts);
        ordered.addAll(rankedInitializingShards);
        return new PlainShardIterator(shardId, ordered);
    }

4. get的内部逻辑

它是在 TransportGetAction 的 shardOperation() 中实现的同步调用:  

先来看个整体的调用时序图:

get查找核心时序图
代码语言:javascript
复制
// org.elasticsearch.action.get.TransportGetAction#shardOperation
    // 该方法为实现get操作的真正方法
    @Override
    protected GetResponse shardOperation(GetRequest request, ShardId shardId) {
        IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
        IndexShard indexShard = indexService.getShard(shardId.id());

        if (request.refresh() && !request.realtime()) {
            indexShard.refresh("refresh_flag_get");
        }
        // 核心 get 实现
        GetResult result = indexShard.getService().get(request.type(), request.id(), request.storedFields(),
                request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());
        return new GetResponse(result);
    }
    // org.elasticsearch.indices.IndicesService#indexServiceSafe
    /**
     * Returns an IndexService for the specified index if exists otherwise a {@link IndexNotFoundException} is thrown.
     */
    public IndexService indexServiceSafe(Index index) {
        IndexService indexService = indices.get(index.getUUID());
        if (indexService == null) {
            throw new IndexNotFoundException(index);
        }
        assert indexService.indexUUID().equals(index.getUUID()) : "uuid mismatch local: " + indexService.indexUUID() +
            " incoming: " + index.getUUID();
        return indexService;
    }
    // org.elasticsearch.index.IndexService#getShard
    /**
     * Return the shard with the provided id, or throw an exception if it doesn't exist.
     */
    public IndexShard getShard(int shardId) {
        IndexShard indexShard = getShardOrNull(shardId);
        if (indexShard == null) {
            throw new ShardNotFoundException(new ShardId(index(), shardId));
        }
        return indexShard;
    }

    // org.elasticsearch.index.shard.IndexShard#getService
    public ShardGetService getService() {
        return this.getService;
    }

    // org.elasticsearch.index.get.ShardGetService#get
    public GetResult get(String type, String id, String[] gFields, boolean realtime, long version,
                            VersionType versionType, FetchSourceContext fetchSourceContext) {
        return
            get(type, id, gFields, realtime, version, versionType, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, fetchSourceContext);
    }
    // org.elasticsearch.index.get.ShardGetService#get
    private GetResult get(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
                          long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
        currentMetric.inc();
        try {
            long now = System.nanoTime();
            GetResult getResult =
                innerGet(type, id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext);

            if (getResult.isExists()) {
                existsMetric.inc(System.nanoTime() - now);
            } else {
                missingMetric.inc(System.nanoTime() - now);
            }
            return getResult;
        } finally {
            currentMetric.dec();
        }
    }
    // org.elasticsearch.index.get.ShardGetService#innerGet
    private GetResult innerGet(String type, String id, String[] gFields, boolean realtime, long version, VersionType versionType,
                               long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {
        // 查询字段信息校验
        fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);
        // 支持无type搜索,即丢弃老版本的 type 字段, 一个索引下全是数据
        if (type == null || type.equals("_all")) {
            DocumentMapper mapper = mapperService.documentMapper();
            type = mapper == null ? null : mapper.type();
        }

        Engine.GetResult get = null;
        if (type != null) {
            Term uidTerm = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
            // 链式构造一个 Get 实例,调用 get() 方法查找doc
            // 其中 indexShard 是前面根据shardId 推断出来的, 也是基于一个doc只能在一个shard上的前提
            get = indexShard.get(new Engine.Get(realtime, realtime, type, id, uidTerm)
                .version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));
            assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";
            if (get.exists() == false) {
                get.close();
            }
        }

        if (get == null || get.exists() == false) {
            return new GetResult(shardId.getIndexName(), type, id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);
        }

        try {
            // break between having loaded it from translog (so we only have _source), and having a document to load
            // 根据docId 查询具体的doc信息
            return innerGetLoadFromStoredFields(type, id, gFields, fetchSourceContext, get, mapperService);
        } finally {
            get.close();
        }
    }

    /**
     * decides what needs to be done based on the request input and always returns a valid non-null FetchSourceContext
     */
    private FetchSourceContext normalizeFetchSourceContent(@Nullable FetchSourceContext context, @Nullable String[] gFields) {
        if (context != null) {
            return context;
        }
        if (gFields == null) {
            return FetchSourceContext.FETCH_SOURCE;
        }
        for (String field : gFields) {
            if (SourceFieldMapper.NAME.equals(field)) {
                return FetchSourceContext.FETCH_SOURCE;
            }
        }
        return FetchSourceContext.DO_NOT_FETCH_SOURCE;
    }
    // org.elasticsearch.index.shard.IndexShard#get
    public Engine.GetResult get(Engine.Get get) {
        // 当前集群状态判定, 是否可读, 如果是 red 状态则是不可读的
        readAllowed();
        DocumentMapper mapper = mapperService.documentMapper();
        if (mapper == null || mapper.type().equals(mapperService.resolveDocumentType(get.type())) == false) {
            return GetResult.NOT_EXISTS;
        }
        // wrapSearcher 实现查询结果封装
        return getEngine().get(get, mapper, this::wrapSearcher);
    }
    // org.elasticsearch.index.shard.IndexShard#readAllowed
    public void readAllowed() throws IllegalIndexShardStateException {
        IndexShardState state = this.state; // one time volatile read
        if (readAllowedStates.contains(state) == false) {
            throw new IllegalIndexShardStateException(shardId, state, "operations only allowed when shard state is one of " +
                readAllowedStates.toString());
        }
    }
    // org.elasticsearch.index.shard.IndexShard#getEngine
    Engine getEngine() {
        Engine engine = getEngineOrNull();
        if (engine == null) {
            throw new AlreadyClosedException("engine is closed");
        }
        return engine;
    }
    /**
     * NOTE: returns null if engine is not yet started (e.g. recovery phase 1, copying over index files, is still running), or if engine is
     * closed.
     */
    protected Engine getEngineOrNull() {
        return this.currentEngineReference.get();
    }

    private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
        assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader())
            != null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader";
        boolean success = false;
        try {
            final Engine.Searcher newSearcher = readerWrapper == null ? searcher : wrapSearcher(searcher, readerWrapper);
            assert newSearcher != null;
            success = true;
            return newSearcher;
        } catch (IOException ex) {
            throw new ElasticsearchException("failed to wrap searcher", ex);
        } finally {
            if (success == false) {
                Releasables.close(success, searcher);
            }
        }
    }


    // org.elasticsearch.index.engine.InternalEngine#get
    @Override
    public GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {
        assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
        // 使用 ReentrantReadWriteLock 实现并发锁
        try (ReleasableLock ignored = readLock.acquire()) {
            // 确保未close
            ensureOpen();
            if (get.realtime()) {
                final VersionValue versionValue;
                try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
                    // we need to lock here to access the version map to do this truly in RT
                    versionValue = getVersionFromMap(get.uid().bytes());
                }
                // 版本号为空,以下大片逻辑就不走了
                if (versionValue != null) {
                    if (versionValue.isDelete()) {
                        return GetResult.NOT_EXISTS;
                    }
                    if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {
                        throw new VersionConflictEngineException(shardId, get.id(),
                            get.versionType().explainConflictForReads(versionValue.version, get.version()));
                    }
                    if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
                        get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term
                    )) {
                        throw new VersionConflictEngineException(shardId, get.id(),
                            get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);
                    }
                    if (get.isReadFromTranslog()) {
                        // this is only used for updates - API _GET calls will always read form a reader for consistency
                        // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0
                        if (versionValue.getLocation() != null) {
                            try {
                                final Translog.Operation operation = translog.readOperation(versionValue.getLocation());
                                if (operation != null) {
                                    return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);
                                }
                            } catch (IOException e) {
                                maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic event
                                throw new EngineException(shardId, "failed to read operation from translog", e);
                            }
                        } else {
                            trackTranslogLocation.set(true);
                        }
                    }
                    assert versionValue.seqNo >= 0 : versionValue;
                    refreshIfNeeded("realtime_get", versionValue.seqNo);
                }
                return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));
            } else {
                // we expose what has been externally expose in a point in time snapshot via an explicit refresh
                return getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));
            }
        }
    }
      // org.elasticsearch.index.engine.Engine#ensureOpen()
    protected final void ensureOpen() {
        ensureOpen(null);
    }
    protected final void ensureOpen(Exception suppressed) {
        if (isClosed.get()) {
            AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
            if (suppressed != null) {
                ace.addSuppressed(suppressed);
            }
            throw ace;
        }
    }
    // org.elasticsearch.index.engine.LiveVersionMap#acquireLock
    /**
     * Acquires a releaseable lock for the given uId. All *UnderLock methods require
     * this lock to be hold by the caller otherwise the visibility guarantees of this version
     * map are broken. We assert on this lock to be hold when calling these methods.
     * @see KeyedLock
     */
    Releasable acquireLock(BytesRef uid) {
        return keyedLock.acquire(uid);
    }
    // org.elasticsearch.common.util.concurrent.KeyedLock#acquire
    /**
     * Acquires a lock for the given key. The key is compared by it's equals method not by object identity. The lock can be acquired
     * by the same thread multiple times. The lock is released by closing the returned {@link Releasable}.
     */
    public Releasable acquire(T key) {
        while (true) {
            KeyLock perNodeLock = map.get(key);
            if (perNodeLock == null) {
                ReleasableLock newLock = tryCreateNewLock(key);
                if (newLock != null) {
                    return newLock;
                }
            } else {
                assert perNodeLock != null;
                int i = perNodeLock.count.get();
                // 最终基于 ReentrantLock 实现锁, KeyLock 实现了计数
                if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
                    perNodeLock.lock();
                    return new ReleasableLock(key, perNodeLock);
                }
            }
        }
    }

    // org.elasticsearch.common.util.concurrent.KeyedLock#tryCreateNewLock
    private ReleasableLock tryCreateNewLock(T key) {
        // KeyLock extends ReentrantLock
        KeyLock newLock = new KeyLock(fair);
        newLock.lock();
        KeyLock keyLock = map.putIfAbsent(key, newLock);
        if (keyLock == null) {
            return new ReleasableLock(key, newLock);
        }
        return null;
    }
    // org.elasticsearch.index.engine.InternalEngine#getVersionFromMap
    private VersionValue getVersionFromMap(BytesRef id) {
        if (versionMap.isUnsafe()) {
            synchronized (versionMap) {
                // we are switching from an unsafe map to a safe map. This might happen concurrently
                // but we only need to do this once since the last operation per ID is to add to the version
                // map so once we pass this point we can safely lookup from the version map.
                if (versionMap.isUnsafe()) {
                    refresh("unsafe_version_map", SearcherScope.INTERNAL, true);
                }
                versionMap.enforceSafeAccess();
            }
        }
        return versionMap.getUnderLock(id);
    }
    // org.elasticsearch.index.engine.LiveVersionMap#getUnderLock(org.apache.lucene.util.BytesRef)
    /**
     * Returns the live version (add or delete) for this uid.
     */
    VersionValue getUnderLock(final BytesRef uid) {
        return getUnderLock(uid, maps);
    }

    private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) {
        assert assertKeyedLockHeldByCurrentThread(uid);
        // First try to get the "live" value:
        VersionValue value = currentMaps.current.get(uid);
        if (value != null) {
            return value;
        }

        value = currentMaps.old.get(uid);
        if (value != null) {
            return value;
        }
        // 返回 null
        return tombstones.get(uid);
    }
    // org.elasticsearch.index.engine.Engine#acquireSearcher
    public Searcher acquireSearcher(String source, SearcherScope scope, Function<Searcher, Searcher> wrapper) throws EngineException {
        SearcherSupplier releasable = null;
        try {
            // 获取 searcher , soure=realtime_get
            SearcherSupplier reader = releasable = acquireSearcherSupplier(wrapper, scope);
            Searcher searcher = reader.acquireSearcher(source);
            releasable = null;
            return new Searcher(source, searcher.getDirectoryReader(), searcher.getSimilarity(),
                searcher.getQueryCache(), searcher.getQueryCachingPolicy(), () -> Releasables.close(searcher, reader));
        } finally {
            Releasables.close(releasable);
        }
    }
    // org.elasticsearch.index.engine.Engine#acquireSearcherSupplier
    /**
     * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
     */
    public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
        /* Acquire order here is store -> manager since we need
         * to make sure that the store is not closed before
         * the searcher is acquired. */
        if (store.tryIncRef() == false) {
            throw new AlreadyClosedException(shardId + " store is closed", failedEngine.get());
        }
        Releasable releasable = store::decRef;
        try {
            // 引用计数+1
            ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
            ElasticsearchDirectoryReader acquire = referenceManager.acquire();
            SearcherSupplier reader = new SearcherSupplier(wrapper) {
                @Override
                public Searcher acquireSearcherInternal(String source) {
                    assert assertSearcherIsWarmedUp(source, scope);
                    return new Searcher(source, acquire, engineConfig.getSimilarity(), engineConfig.getQueryCache(),
                        engineConfig.getQueryCachingPolicy(), () -> {});
                }

                @Override
                protected void doClose() {
                    try {
                        referenceManager.release(acquire);
                    } catch (IOException e) {
                        throw new UncheckedIOException("failed to close", e);
                    } catch (AlreadyClosedException e) {
                        // This means there's a bug somewhere: don't suppress it
                        throw new AssertionError(e);
                    } finally {
                        store.decRef();
                    }
                }
            };
            releasable = null; // success - hand over the reference to the engine reader
            return reader;
        } catch (AlreadyClosedException ex) {
            throw ex;
        } catch (Exception ex) {
            maybeFailEngine("acquire_reader", ex);
            ensureOpen(ex); // throw EngineCloseException here if we are already closed
            logger.error(() -> new ParameterizedMessage("failed to acquire reader"), ex);
            throw new EngineException(shardId, "failed to acquire reader", ex);
        } finally {
            Releasables.close(releasable);
        }
    }
        // org.elasticsearch.index.engine.Engine.SearcherSupplier#acquireSearcher
        public final Searcher acquireSearcher(String source) {
            if (released.get()) {
                throw new AlreadyClosedException("SearcherSupplier was closed");
            }
            // 新建 searcher
            final Searcher searcher = acquireSearcherInternal(source);
            return CAN_MATCH_SEARCH_SOURCE.equals(source) ? searcher : wrapper.apply(searcher);
        }
        // org.elasticsearch.index.engine.Engine.Searcher#Searcher
        public Searcher(String source, IndexReader reader,
                        Similarity similarity, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
                        Closeable onClose) {
            super(reader);
            setSimilarity(similarity);
            setQueryCache(queryCache);
            setQueryCachingPolicy(queryCachingPolicy);
            this.source = source;
            this.onClose = onClose;
        }
    // org.elasticsearch.index.shard.IndexShard#wrapSearcher
    private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
        assert ElasticsearchDirectoryReader.unwrap(searcher.getDirectoryReader())
            != null : "DirectoryReader must be an instance or ElasticsearchDirectoryReader";
        boolean success = false;
        try {
            final Engine.Searcher newSearcher = readerWrapper == null ? searcher : wrapSearcher(searcher, readerWrapper);
            assert newSearcher != null;
            success = true;
            return newSearcher;
        } catch (IOException ex) {
            throw new ElasticsearchException("failed to wrap searcher", ex);
        } finally {
            if (success == false) {
                Releasables.close(success, searcher);
            }
        }
    }
    // org.elasticsearch.index.shard.IndexShard#wrapSearcher
    static Engine.Searcher wrapSearcher(Engine.Searcher engineSearcher,
                                        CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper) throws IOException {
        assert readerWrapper != null;
        final ElasticsearchDirectoryReader elasticsearchDirectoryReader =
            ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(engineSearcher.getDirectoryReader());
        if (elasticsearchDirectoryReader == null) {
            throw new IllegalStateException("Can't wrap non elasticsearch directory reader");
        }
        NonClosingReaderWrapper nonClosingReaderWrapper = new NonClosingReaderWrapper(engineSearcher.getDirectoryReader());
        DirectoryReader reader = readerWrapper.apply(nonClosingReaderWrapper);
        if (reader != nonClosingReaderWrapper) {
            if (reader.getReaderCacheHelper() != elasticsearchDirectoryReader.getReaderCacheHelper()) {
                throw new IllegalStateException("wrapped directory reader doesn't delegate IndexReader#getCoreCacheKey," +
                    " wrappers must override this method and delegate to the original readers core cache key. Wrapped readers can't be " +
                    "used as cache keys since their are used only per request which would lead to subtle bugs");
            }
            if (ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(reader) != elasticsearchDirectoryReader) {
                // prevent that somebody wraps with a non-filter reader
                throw new IllegalStateException("wrapped directory reader hides actual ElasticsearchDirectoryReader but shouldn't");
            }
        }

        if (reader == nonClosingReaderWrapper) {
            return engineSearcher;
        } else {
            // we close the reader to make sure wrappers can release resources if needed....
            // our NonClosingReaderWrapper makes sure that our reader is not closed
            return new Engine.Searcher(engineSearcher.source(), reader,
                engineSearcher.getSimilarity(), engineSearcher.getQueryCache(), engineSearcher.getQueryCachingPolicy(),
                () -> IOUtils.close(reader, // this will close the wrappers excluding the NonClosingReaderWrapper
                    engineSearcher)); // this will run the closeable on the wrapped engine reader
        }
    }

获取到searcher实例后,再由其进行get操作。

代码语言:javascript
复制
// org.elasticsearch.index.engine.Engine#getFromSearcher
    protected final GetResult getFromSearcher(Get get, Engine.Searcher searcher) throws EngineException {
        final DocIdAndVersion docIdAndVersion;
        try {
            // 读取docId, version, 其实就是核心的读数据实现之一
            docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.getIndexReader(), get.uid(), true);
        } catch (Exception e) {
            Releasables.closeWhileHandlingException(searcher);
            //TODO: A better exception goes here
            throw new EngineException(shardId, "Couldn't resolve version", e);
        }

        if (docIdAndVersion != null) {
            // 检测版本是否冲突,如有则抛出异常
            if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {
                Releasables.close(searcher);
                throw new VersionConflictEngineException(shardId, get.id(),
                        get.versionType().explainConflictForReads(docIdAndVersion.version, get.version()));
            }
            if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (
                get.getIfSeqNo() != docIdAndVersion.seqNo || get.getIfPrimaryTerm() != docIdAndVersion.primaryTerm
            )) {
                Releasables.close(searcher);
                throw new VersionConflictEngineException(shardId, get.id(),
                    get.getIfSeqNo(), get.getIfPrimaryTerm(), docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
            }
        }

        if (docIdAndVersion != null) {
            // don't release the searcher on this path, it is the
            // responsibility of the caller to call GetResult.release
            return new GetResult(searcher, docIdAndVersion, false);
        } else {
            Releasables.close(searcher);
            return GetResult.NOT_EXISTS;
        }
    }
    // org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver#loadDocIdAndVersion
    /**
     * Load the internal doc ID and version for the uid from the reader, returning<ul>
     * <li>null if the uid wasn't found,
     * <li>a doc ID and a version otherwise
     * </ul>
     */
    public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term, boolean loadSeqNo) throws IOException {
        // feild: _id
        PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
        List<LeafReaderContext> leaves = reader.leaves();
        // iterate backwards to optimize for the frequently updated documents
        // which are likely to be in the last segments
        for (int i = leaves.size() - 1; i >= 0; i--) {
            final LeafReaderContext leaf = leaves.get(i);
            PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
            DocIdAndVersion result = lookup.lookupVersion(term.bytes(), loadSeqNo, leaf);
            if (result != null) {
                return result;
            }
        }
        return null;
    }
    // org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver#getLookupState
    private static PerThreadIDVersionAndSeqNoLookup[] getLookupState(IndexReader reader, String uidField) throws IOException {
        // We cache on the top level
        // This means cache entries have a shorter lifetime, maybe as low as 1s with the
        // default refresh interval and a steady indexing rate, but on the other hand it
        // proved to be cheaper than having to perform a CHM and a TL get for every segment.
        // See https://github.com/elastic/elasticsearch/pull/19856.
        IndexReader.CacheHelper cacheHelper = reader.getReaderCacheHelper();
        CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> ctl = lookupStates.get(cacheHelper.getKey());
        if (ctl == null) {
            // First time we are seeing this reader's core; make a new CTL:
            ctl = new CloseableThreadLocal<>();
            CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup[]> other = lookupStates.putIfAbsent(cacheHelper.getKey(), ctl);
            if (other == null) {
                // Our CTL won, we must remove it when the reader is closed:
                cacheHelper.addClosedListener(removeLookupState);
            } else {
                // Another thread beat us to it: just use their CTL:
                ctl = other;
            }
        }

        PerThreadIDVersionAndSeqNoLookup[] lookupState = ctl.get();
        if (lookupState == null) {
            lookupState = new PerThreadIDVersionAndSeqNoLookup[reader.leaves().size()];
            for (LeafReaderContext leaf : reader.leaves()) {
                lookupState[leaf.ord] = new PerThreadIDVersionAndSeqNoLookup(leaf.reader(), uidField);
            }
            ctl.set(lookupState);
        }

        if (lookupState.length != reader.leaves().size()) {
            throw new AssertionError("Mismatched numbers of leaves: " + lookupState.length + " != " + reader.leaves().size());
        }

        if (lookupState.length > 0 && Objects.equals(lookupState[0].uidField, uidField) == false) {
            throw new AssertionError("Index does not consistently use the same uid field: ["
                    + uidField + "] != [" + lookupState[0].uidField + "]");
        }

        return lookupState;
    }
  // org.apache.lucene.index.IndexReader#leaves
  /**
   * Returns the reader's leaves, or itself if this reader is atomic.
   * This is a convenience method calling {@code this.getContext().leaves()}.
   * @see IndexReaderContext#leaves()
   */
  public final List<LeafReaderContext> leaves() {
    return getContext().leaves();
  }
  // org.apache.lucene.index.CompositeReader#getContext
  @Override
  public final CompositeReaderContext getContext() {
    ensureOpen();
    // lazy init without thread safety for perf reasons: Building the readerContext twice does not hurt!
    if (readerContext == null) {
      assert getSequentialSubReaders() != null;
      readerContext = CompositeReaderContext.create(this);
    }
    return readerContext;
  }
  // org.apache.lucene.index.CompositeReaderContext#leaves
  @Override
  public List<LeafReaderContext> leaves() throws UnsupportedOperationException {
    if (!isTopLevel)
      throw new UnsupportedOperationException("This is not a top-level context.");
    assert leaves != null;
    return leaves;
  }
      // org.elasticsearch.common.lucene.uid.PerThreadIDVersionAndSeqNoLookup#lookupVersion
    /** Return null if id is not found.
     * We pass the {@link LeafReaderContext} as an argument so that things
     * still work with reader wrappers that hide some documents while still
     * using the same cache key. Otherwise we'd have to disable caching
     * entirely for these readers.
     */
    public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderContext context)
        throws IOException {
        assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
            "context's reader is not the same as the reader class was initialized on.";
        // docId 是个整型?
        int docID = getDocID(id, context);

        if (docID != DocIdSetIterator.NO_MORE_DOCS) {
            final long seqNo;
            final long term;
            if (loadSeqNo) {
                seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
                term = readNumericDocValues(context.reader(), SeqNoFieldMapper.PRIMARY_TERM_NAME, docID);
            } else {
                seqNo = UNASSIGNED_SEQ_NO;
                term = UNASSIGNED_PRIMARY_TERM;
            }
            final long version = readNumericDocValues(context.reader(), VersionFieldMapper.NAME, docID);
            return new DocIdAndVersion(docID, version, seqNo, term, context.reader(), context.docBase);
        } else {
            return null;
        }
    }
    // org.elasticsearch.common.lucene.uid.PerThreadIDVersionAndSeqNoLookup#getDocID
    /**
     * returns the internal lucene doc id for the given id bytes.
     * {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
     * */
    private int getDocID(BytesRef id, LeafReaderContext context) throws IOException {
        // termsEnum can possibly be null here if this leaf contains only no-ops.
        // lucene 接口: seekExact
        if (termsEnum != null && termsEnum.seekExact(id)) {
            final Bits liveDocs = context.reader().getLiveDocs();
            int docID = DocIdSetIterator.NO_MORE_DOCS;
            // there may be more than one matching docID, in the case of nested docs, so we want the last one:
            docsEnum = termsEnum.postings(docsEnum, 0);
            for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
                if (liveDocs != null && liveDocs.get(d) == false) {
                    continue;
                }
                docID = d;
            }
            return docID;
        } else {
            return DocIdSetIterator.NO_MORE_DOCS;
        }
    }

    // org.elasticsearch.common.lucene.uid.PerThreadIDVersionAndSeqNoLookup#readNumericDocValues
    private static long readNumericDocValues(LeafReader reader, String field, int docId) throws IOException {
        final NumericDocValues dv = reader.getNumericDocValues(field);
        if (dv == null || dv.advanceExact(docId) == false) {
            assert false : "document [" + docId + "] does not have docValues for [" + field + "]";
            throw new IllegalStateException("document [" + docId + "] does not have docValues for [" + field + "]");
        }
        return dv.longValue();
    }
    // docId 包装
    // org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion
    /** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
    public static class DocIdAndVersion {
        public final int docId;
        public final long version;
        public final long seqNo;
        public final long primaryTerm;
        public final LeafReader reader;
        public final int docBase;

        public DocIdAndVersion(int docId, long version, long seqNo, long primaryTerm, LeafReader reader, int docBase) {
            this.docId = docId;
            this.version = version;
            this.seqNo = seqNo;
            this.primaryTerm = primaryTerm;
            this.reader = reader;
            this.docBase = docBase;
        }
    }
        // org.elasticsearch.index.VersionType#isVersionConflictForReads
        @Override
        public boolean isVersionConflictForReads(long currentVersion, long expectedVersion) {
            return isVersionConflict(currentVersion, expectedVersion, false);
        }
        private boolean isVersionConflict(long currentVersion, long expectedVersion, boolean deleted) {
            if (expectedVersion == Versions.MATCH_ANY) {
                return false;
            }
            if (expectedVersion == Versions.MATCH_DELETED) {
                return deleted == false;
            }
            if (currentVersion != expectedVersion) {
                return true;
            }
            return false;
        }
        // 最后,构造 GetResult 返回
        // org.elasticsearch.index.engine.Engine.GetResult#GetResult
        public GetResult(Engine.Searcher searcher, DocIdAndVersion docIdAndVersion, boolean fromTranslog) {
            this(true, docIdAndVersion.version, docIdAndVersion, searcher, fromTranslog);
        }

        private GetResult(boolean exists, long version, DocIdAndVersion docIdAndVersion, Engine.Searcher searcher, boolean fromTranslog) {
            this.exists = exists;
            this.version = version;
            this.docIdAndVersion = docIdAndVersion;
            this.searcher = searcher;
            this.fromTranslog = fromTranslog;
            assert fromTranslog == false || searcher.getIndexReader() instanceof TranslogLeafReader;
        }

前面根据id调用 lucene 接口获取到了docId, 接下来根据docId查询doc详情:

代码语言:javascript
复制
// org.elasticsearch.index.get.ShardGetService#innerGetLoadFromStoredFields
    private GetResult innerGetLoadFromStoredFields(String type, String id, String[] storedFields, FetchSourceContext fetchSourceContext,
                                                   Engine.GetResult get, MapperService mapperService) {
        assert get.exists() : "method should only be called if document could be retrieved";

        // check first if stored fields to be loaded don't contain an object field
        DocumentMapper docMapper = mapperService.documentMapper();
        if (storedFields != null) {
            for (String field : storedFields) {
                Mapper fieldMapper = docMapper.mappers().getMapper(field);
                if (fieldMapper == null) {
                    if (docMapper.mappers().objectMappers().get(field) != null) {
                        // Only fail if we know it is a object field, missing paths / fields shouldn't fail.
                        throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");
                    }
                }
            }
        }

        Map<String, DocumentField> documentFields = null;
        Map<String, DocumentField> metadataFields = null;
        BytesReference source = null;
        DocIdAndVersion docIdAndVersion = get.docIdAndVersion();
        // force fetching source if we read from translog and need to recreate stored fields
        boolean forceSourceForComputingTranslogStoredFields = get.isFromTranslog() && storedFields != null &&
                Stream.of(storedFields).anyMatch(f -> TranslogLeafReader.ALL_FIELD_NAMES.contains(f) == false);
        // 构造字段访问器
        FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields,
            forceSourceForComputingTranslogStoredFields ? FetchSourceContext.FETCH_SOURCE : fetchSourceContext);
        if (fieldVisitor != null) {
            try {
                // 一个字段读取
                docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);
            } catch (IOException e) {
                throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "]", e);
            }
            source = fieldVisitor.source();

            // in case we read from translog, some extra steps are needed to make _source consistent and to load stored fields
            if (get.isFromTranslog()) {
                // Fast path: if only asked for the source or stored fields that have been already provided by TranslogLeafReader,
                // just make source consistent by reapplying source filters from mapping (possibly also nulling the source)
                if (forceSourceForComputingTranslogStoredFields == false) {
                    try {
                        source = indexShard.mapperService().documentMapper().sourceMapper().applyFilters(source, null);
                    } catch (IOException e) {
                        throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);
                    }
                } else {
                    // Slow path: recreate stored fields from original source
                    assert source != null : "original source in translog must exist";
                    SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), type, id, source,
                        XContentHelper.xContentType(source), fieldVisitor.routing());
                    ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);
                    assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";
                    // update special fields
                    doc.updateSeqID(docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);
                    doc.version().setLongValue(docIdAndVersion.version);

                    // retrieve stored fields from parsed doc
                    fieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext);
                    for (IndexableField indexableField : doc.rootDoc().getFields()) {
                        IndexableFieldType fieldType = indexableField.fieldType();
                        if (fieldType.stored()) {
                            FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,
                                DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);
                            StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);
                            if (status == StoredFieldVisitor.Status.YES) {
                                if (indexableField.numericValue() != null) {
                                    fieldVisitor.objectField(fieldInfo, indexableField.numericValue());
                                } else if (indexableField.binaryValue() != null) {
                                    fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());
                                } else if (indexableField.stringValue() != null) {
                                    fieldVisitor.objectField(fieldInfo, indexableField.stringValue());
                                }
                            } else if (status == StoredFieldVisitor.Status.STOP) {
                                break;
                            }
                        }
                    }
                    // retrieve source (with possible transformations, e.g. source filters
                    source = fieldVisitor.source();
                }
            }

            // put stored fields into result objects
            if (!fieldVisitor.fields().isEmpty()) {
                fieldVisitor.postProcess(mapperService::fieldType,
                    mapperService.documentMapper() == null ? null : mapperService.documentMapper().type());
                documentFields = new HashMap<>();
                metadataFields = new HashMap<>();
                for (Map.Entry<String, List<Object>> entry : fieldVisitor.fields().entrySet()) {
                    if (mapperService.isMetadataField(entry.getKey())) {
                        metadataFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
                    } else {
                        documentFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));
                    }
                }
            }
        }

        if (source != null) {
            // apply request-level source filtering
            if (fetchSourceContext.fetchSource() == false) {
                source = null;
            } else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {
                Map<String, Object> sourceAsMap;
                // TODO: The source might be parsed and available in the sourceLookup but that one uses unordered maps so different.
                //  Do we care?
                Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
                XContentType sourceContentType = typeMapTuple.v1();
                sourceAsMap = typeMapTuple.v2();
                sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
                try {
                    source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));
                } catch (IOException e) {
                    throw new ElasticsearchException("Failed to get id [" + id + "] with includes/excludes set", e);
                }
            }
        }

        if (!fetchSourceContext.fetchSource()) {
            source = null;
        }

        if (source != null && get.isFromTranslog()) {
            // reapply source filters from mapping (possibly also nulling the source)
            try {
                source = docMapper.sourceMapper().applyFilters(source, null);
            } catch (IOException e) {
                throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);
            }
        }

        if (source != null && (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0)) {
            Map<String, Object> sourceAsMap;
            // TODO: The source might parsed and available in the sourceLookup but that one uses unordered maps so different. Do we care?
            Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);
            XContentType sourceContentType = typeMapTuple.v1();
            sourceAsMap = typeMapTuple.v2();
            sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());
            try {
                source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));
            } catch (IOException e) {
                throw new ElasticsearchException("Failed to get type [" + type + "] and id [" + id + "] with includes/excludes set", e);
            }
        }
        // 构造新的 GetResult 返回, source 就是最终查询的结果
        return new GetResult(shardId.getIndexName(), type, id, get.docIdAndVersion().seqNo, get.docIdAndVersion().primaryTerm,
            get.version(), get.exists(), source, documentFields, metadataFields);
    }

    private static FieldsVisitor buildFieldsVisitors(String[] fields, FetchSourceContext fetchSourceContext) {
        if (fields == null || fields.length == 0) {
            return fetchSourceContext.fetchSource() ? new FieldsVisitor(true) : null;
        }

        return new CustomFieldsVisitor(Sets.newHashSet(fields), fetchSourceContext.fetchSource());
    }

最后,一点lucene的查找实现片段,供参考。

代码语言:javascript
复制
// 快速定位到指定id的文档位置,相当于索引定位
  // org.apache.lucene.codecs.blocktree.SegmentTermsEnum#seekExact(org.apache.lucene.util.BytesRef)
  @Override
  public boolean seekExact(BytesRef target) throws IOException {

    if (fr.index == null) {
      throw new IllegalStateException("terms index was not loaded");
    }

    if (fr.size() > 0 && (target.compareTo(fr.getMin()) < 0 || target.compareTo(fr.getMax()) > 0)) {
        return false;
    }

    term.grow(1 + target.length);

    assert clearEOF();

    // if (DEBUG) {
    //   System.out.println("\nBTTR.seekExact seg=" + fr.parent.segment + " target=" + fr.fieldInfo.name + ":" + brToString(target) + " current=" + brToString(term) + " (exists?=" + termExists + ") validIndexPrefix=" + validIndexPrefix);
    //   printSeekState(System.out);
    // }

    FST.Arc<BytesRef> arc;
    int targetUpto;
    BytesRef output;

    targetBeforeCurrentLength = currentFrame.ord;

    if (currentFrame != staticFrame) {

      // We are already seek'd; find the common
      // prefix of new seek term vs current term and
      // re-use the corresponding seek state.  For
      // example, if app first seeks to foobar, then
      // seeks to foobaz, we can re-use the seek state
      // for the first 5 bytes.

      // if (DEBUG) {
      //   System.out.println("  re-use current seek state validIndexPrefix=" + validIndexPrefix);
      // }

      arc = arcs[0];
      assert arc.isFinal();
      output = arc.output();
      targetUpto = 0;

      SegmentTermsEnumFrame lastFrame = stack[0];
      assert validIndexPrefix <= term.length();

      final int targetLimit = Math.min(target.length, validIndexPrefix);

      int cmp = 0;

      // TODO: reverse vLong byte order for better FST
      // prefix output sharing

      // First compare up to valid seek frames:
      while (targetUpto < targetLimit) {
        cmp = (term.byteAt(targetUpto)&0xFF) - (target.bytes[target.offset + targetUpto]&0xFF);
        // if (DEBUG) {
        //    System.out.println("    cycle targetUpto=" + targetUpto + " (vs limit=" + targetLimit + ") cmp=" + cmp + " (targetLabel=" + (char) (target.bytes[target.offset + targetUpto]) + " vs termLabel=" + (char) (term.bytes[targetUpto]) + ")"   + " arc.output=" + arc.output + " output=" + output);
        // }
        if (cmp != 0) {
          break;
        }
        arc = arcs[1+targetUpto];
        assert arc.label() == (target.bytes[target.offset + targetUpto] & 0xFF): "arc.label=" + (char) arc.label() + " targetLabel=" + (char) (target.bytes[target.offset + targetUpto] & 0xFF);
        if (arc.output() != BlockTreeTermsReader.NO_OUTPUT) {
          output = BlockTreeTermsReader.FST_OUTPUTS.add(output, arc.output());
        }
        if (arc.isFinal()) {
          lastFrame = stack[1+lastFrame.ord];
        }
        targetUpto++;
      }

      if (cmp == 0) {
        final int targetUptoMid = targetUpto;

        // Second compare the rest of the term, but
        // don't save arc/output/frame; we only do this
        // to find out if the target term is before,
        // equal or after the current term
        final int targetLimit2 = Math.min(target.length, term.length());
        while (targetUpto < targetLimit2) {
          cmp = (term.byteAt(targetUpto)&0xFF) - (target.bytes[target.offset + targetUpto]&0xFF);
          // if (DEBUG) {
          //    System.out.println("    cycle2 targetUpto=" + targetUpto + " (vs limit=" + targetLimit + ") cmp=" + cmp + " (targetLabel=" + (char) (target.bytes[target.offset + targetUpto]) + " vs termLabel=" + (char) (term.bytes[targetUpto]) + ")");
          // }
          if (cmp != 0) {
            break;
          }
          targetUpto++;
        }

        if (cmp == 0) {
          cmp = term.length() - target.length;
        }
        targetUpto = targetUptoMid;
      }

      if (cmp < 0) {
        // Common case: target term is after current
        // term, ie, app is seeking multiple terms
        // in sorted order
        // if (DEBUG) {
        //   System.out.println("  target is after current (shares prefixLen=" + targetUpto + "); frame.ord=" + lastFrame.ord);
        // }
        currentFrame = lastFrame;

      } else if (cmp > 0) {
        // Uncommon case: target term
        // is before current term; this means we can
        // keep the currentFrame but we must rewind it
        // (so we scan from the start)
        targetBeforeCurrentLength = lastFrame.ord;
        // if (DEBUG) {
        //   System.out.println("  target is before current (shares prefixLen=" + targetUpto + "); rewind frame ord=" + lastFrame.ord);
        // }
        currentFrame = lastFrame;
        currentFrame.rewind();
      } else {
        // Target is exactly the same as current term
        assert term.length() == target.length;
        if (termExists) {
          // if (DEBUG) {
          //   System.out.println("  target is same as current; return true");
          // }
          return true;
        } else {
          // if (DEBUG) {
          //   System.out.println("  target is same as current but term doesn't exist");
          // }
        }
        //validIndexPrefix = currentFrame.depth;
        //term.length = target.length;
        //return termExists;
      }

    } else {

      targetBeforeCurrentLength = -1;
      arc = fr.index.getFirstArc(arcs[0]);

      // Empty string prefix must have an output (block) in the index!
      assert arc.isFinal();
      assert arc.output() != null;

      // if (DEBUG) {
      //   System.out.println("    no seek state; push root frame");
      // }

      output = arc.output();

      currentFrame = staticFrame;

      //term.length = 0;
      targetUpto = 0;
      currentFrame = pushFrame(arc, BlockTreeTermsReader.FST_OUTPUTS.add(output, arc.nextFinalOutput()), 0);
    }

    // if (DEBUG) {
    //   System.out.println("  start index loop targetUpto=" + targetUpto + " output=" + output + " currentFrame.ord=" + currentFrame.ord + " targetBeforeCurrentLength=" + targetBeforeCurrentLength);
    // }

    // We are done sharing the common prefix with the incoming target and where we are currently seek'd; now continue walking the index:
    while (targetUpto < target.length) {

      final int targetLabel = target.bytes[target.offset + targetUpto] & 0xFF;

      final FST.Arc<BytesRef> nextArc = fr.index.findTargetArc(targetLabel, arc, getArc(1+targetUpto), fstReader);

      if (nextArc == null) {

        // Index is exhausted
        // if (DEBUG) {
        //   System.out.println("    index: index exhausted label=" + ((char) targetLabel) + " " + toHex(targetLabel));
        // }

        validIndexPrefix = currentFrame.prefix;
        //validIndexPrefix = targetUpto;

        currentFrame.scanToFloorFrame(target);

        if (!currentFrame.hasTerms) {
          termExists = false;
          term.setByteAt(targetUpto, (byte) targetLabel);
          term.setLength(1+targetUpto);
          // if (DEBUG) {
          //   System.out.println("  FAST NOT_FOUND term=" + brToString(term));
          // }
          return false;
        }

        currentFrame.loadBlock();

        final SeekStatus result = currentFrame.scanToTerm(target, true);            
        if (result == SeekStatus.FOUND) {
          // if (DEBUG) {
          //   System.out.println("  return FOUND term=" + term.utf8ToString() + " " + term);
          // }
          return true;
        } else {
          // if (DEBUG) {
          //   System.out.println("  got " + result + "; return NOT_FOUND term=" + brToString(term));
          // }
          return false;
        }
      } else {
        // Follow this arc
        arc = nextArc;
        term.setByteAt(targetUpto, (byte) targetLabel);
        // Aggregate output as we go:
        assert arc.output() != null;
        if (arc.output() != BlockTreeTermsReader.NO_OUTPUT) {
          output = BlockTreeTermsReader.FST_OUTPUTS.add(output, arc.output());
        }

        // if (DEBUG) {
        //   System.out.println("    index: follow label=" + toHex(target.bytes[target.offset + targetUpto]&0xff) + " arc.output=" + arc.output + " arc.nfo=" + arc.nextFinalOutput);
        // }
        targetUpto++;

        if (arc.isFinal()) {
          //if (DEBUG) System.out.println("    arc is final!");
          currentFrame = pushFrame(arc, BlockTreeTermsReader.FST_OUTPUTS.add(output, arc.nextFinalOutput()), targetUpto);
          //if (DEBUG) System.out.println("    curFrame.ord=" + currentFrame.ord + " hasTerms=" + currentFrame.hasTerms);
        }
      }
    }

    //validIndexPrefix = targetUpto;
    validIndexPrefix = currentFrame.prefix;

    currentFrame.scanToFloorFrame(target);

    // Target term is entirely contained in the index:
    if (!currentFrame.hasTerms) {
      termExists = false;
      term.setLength(targetUpto);
      // if (DEBUG) {
      //   System.out.println("  FAST NOT_FOUND term=" + brToString(term));
      // }
      return false;
    }

    currentFrame.loadBlock();

    final SeekStatus result = currentFrame.scanToTerm(target, true);            
    if (result == SeekStatus.FOUND) {
      // if (DEBUG) {
      //   System.out.println("  return FOUND term=" + term.utf8ToString() + " " + term);
      // }
      return true;
    } else {
      // if (DEBUG) {
      //   System.out.println("  got result " + result + "; return NOT_FOUND term=" + term.utf8ToString());
      // }

      return false;
    }
  }
  // org.apache.lucene.codecs.blocktree.SegmentTermsEnum#postings
  @Override
  public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException {
    assert !eof;
    //if (DEBUG) {
    //System.out.println("BTTR.docs seg=" + segment);
    //}
    currentFrame.decodeMetaData();
    //if (DEBUG) {
    //System.out.println("  state=" + currentFrame.state);
    //}
    return fr.parent.postingsReader.postings(fr.fieldInfo, currentFrame.state, reuse, flags);
  }
  // 根据docId 获取文档的详情信息
  // org.apache.lucene.index.FilterLeafReader#document
  @Override
  public void document(int docID, StoredFieldVisitor visitor) throws IOException {
    ensureOpen();
    in.document(docID, visitor);
  }
  // org.apache.lucene.index.CodecReader#document
  @Override
  public final void document(int docID, StoredFieldVisitor visitor) throws IOException {
    checkBounds(docID);
    getFieldsReader().visitDocument(docID, visitor);
  }
  // org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader#visitDocument
  @Override
  public void visitDocument(int docID, StoredFieldVisitor visitor)
      throws IOException {

    final SerializedDocument doc = document(docID);

    for (int fieldIDX = 0; fieldIDX < doc.numStoredFields; fieldIDX++) {
      final long infoAndBits = doc.in.readVLong();
      final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
      final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);

      final int bits = (int) (infoAndBits & TYPE_MASK);
      assert bits <= NUMERIC_DOUBLE: "bits=" + Integer.toHexString(bits);

      switch(visitor.needsField(fieldInfo)) {
        case YES:
          readField(doc.in, visitor, fieldInfo, bits);
          break;
        case NO:
          if (fieldIDX == doc.numStoredFields - 1) {// don't skipField on last field value; treat like STOP
            return;
          }
          skipField(doc.in, bits);
          break;
        case STOP:
          return;
      }
    }
  }
  // org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader#document
  SerializedDocument document(int docID) throws IOException {
      // 定位文档位置,取出文档。。。
    if (state.contains(docID) == false) {
      fieldsStream.seek(indexReader.getStartPointer(docID));
      state.reset(docID);
    }
    assert state.contains(docID);
    return state.document(docID);
  }
  // org.apache.lucene.codecs.compressing.FieldsIndexReader#getStartPointer
  @Override
  long getStartPointer(int docID) {
    FutureObjects.checkIndex(docID, maxDoc);
    long blockIndex = docs.binarySearch(0, numChunks, docID);
    if (blockIndex < 0) {
      blockIndex = -2 - blockIndex;
    }
    return startPointers.get(blockIndex);
  }
      // org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader.BlockState#document
    /**
     * Get the serialized representation of the given docID. This docID has
     * to be contained in the current block.
     */
    SerializedDocument document(int docID) throws IOException {
      if (contains(docID) == false) {
        throw new IllegalArgumentException();
      }

      final int index = docID - docBase;
      final int offset = Math.toIntExact(offsets[index]);
      final int length = Math.toIntExact(offsets[index+1]) - offset;
      final int totalLength = Math.toIntExact(offsets[chunkDocs]);
      final int numStoredFields = Math.toIntExact(this.numStoredFields[index]);

      final BytesRef bytes;
      if (merging) {
        bytes = this.bytes;
      } else {
        bytes = new BytesRef();
      }

      final DataInput documentInput;
      if (length == 0) {
        // empty
        documentInput = new ByteArrayDataInput();
      } else if (merging) {
        // already decompressed
        documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset + offset, length);
      } else if (sliced) {
        fieldsStream.seek(startPointer);
        decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
        documentInput = new DataInput() {

          int decompressed = bytes.length;

          void fillBuffer() throws IOException {
            assert decompressed <= length;
            if (decompressed == length) {
              throw new EOFException();
            }
            final int toDecompress = Math.min(length - decompressed, chunkSize);
            decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
            decompressed += toDecompress;
          }

          @Override
          public byte readByte() throws IOException {
            if (bytes.length == 0) {
              fillBuffer();
            }
            --bytes.length;
            return bytes.bytes[bytes.offset++];
          }

          @Override
          public void readBytes(byte[] b, int offset, int len) throws IOException {
            while (len > bytes.length) {
              System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
              len -= bytes.length;
              offset += bytes.length;
              fillBuffer();
            }
            System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
            bytes.offset += len;
            bytes.length -= len;
          }

        };
      } else {
        fieldsStream.seek(startPointer);
        decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
        assert bytes.length == length;
        documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
      }

      return new SerializedDocument(documentInput, length, numStoredFields);
    }

5. 几个数据结构

核心的几个get的外观数据结构主要有,请求时GetRequest, 查找时DocIdAndVersion, 真正的结果GetResult, 以及响应时的出参GetResponse. 其实前面已经可以查看,不过在此处做个归集,有意者阅之。

0. get的请求数据结构

代码语言:javascript
复制
// org.elasticsearch.action.get.GetRequest
public class GetRequest extends SingleShardRequest<GetRequest> implements RealtimeRequest {

    private String type;
    private String id;
    private String routing;
    private String preference;

    private String[] storedFields;

    private FetchSourceContext fetchSourceContext;

    private boolean refresh = false;

    boolean realtime = true;

    private VersionType versionType = VersionType.INTERNAL;
    private long version = Versions.MATCH_ANY;

    public GetRequest() {
        type = MapperService.SINGLE_MAPPING_NAME;
    }

    GetRequest(StreamInput in) throws IOException {
        super(in);
        type = in.readString();
        id = in.readString();
        routing = in.readOptionalString();
        if (in.getVersion().before(Version.V_7_0_0)) {
            in.readOptionalString();
        }
        preference = in.readOptionalString();
        refresh = in.readBoolean();
        storedFields = in.readOptionalStringArray();
        realtime = in.readBoolean();

        this.versionType = VersionType.fromValue(in.readByte());
        this.version = in.readLong();
        fetchSourceContext = in.readOptionalWriteable(FetchSourceContext::new);
    }
    ...
}

1. docId 和 version 包含数据结构

代码语言:javascript
复制
// org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion
    /** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
    public static class DocIdAndVersion {
        public final int docId;
        public final long version;
        public final long seqNo;
        public final long primaryTerm;
        public final LeafReader reader;
        public final int docBase;

        public DocIdAndVersion(int docId, long version, long seqNo, long primaryTerm, LeafReader reader, int docBase) {
            this.docId = docId;
            this.version = version;
            this.seqNo = seqNo;
            this.primaryTerm = primaryTerm;
            this.reader = reader;
            this.docBase = docBase;
        }
    }

2. GetResult 数据结构

代码语言:javascript
复制
// org.elasticsearch.index.get.GetResult
public class GetResult implements Writeable, Iterable<DocumentField>, ToXContentObject {

    public static final String _INDEX = "_index";
    public static final String _TYPE = "_type";
    public static final String _ID = "_id";
    private static final String _VERSION = "_version";
    private static final String _SEQ_NO = "_seq_no";
    private static final String _PRIMARY_TERM = "_primary_term";
    private static final String FOUND = "found";
    private static final String FIELDS = "fields";

    private String index;
    private String type;
    private String id;
    private long version;
    private long seqNo;
    private long primaryTerm;
    private boolean exists;
    private final Map<String, DocumentField> documentFields;
    private final Map<String, DocumentField> metaFields;
    private Map<String, Object> sourceAsMap;
    private BytesReference source;
    private byte[] sourceAsBytes;

    public GetResult(StreamInput in) throws IOException {
        index = in.readString();
        type = in.readOptionalString();
        id = in.readString();
        if (in.getVersion().onOrAfter(Version.V_6_6_0)) {
            seqNo = in.readZLong();
            primaryTerm = in.readVLong();
        } else {
            seqNo = UNASSIGNED_SEQ_NO;
            primaryTerm = UNASSIGNED_PRIMARY_TERM;
        }
        version = in.readLong();
        exists = in.readBoolean();
        if (exists) {
            source = in.readBytesReference();
            if (source.length() == 0) {
                source = null;
            }
            if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
                documentFields = readFields(in);
                metaFields = readFields(in);
            } else {
                Map<String, DocumentField> fields = readFields(in);
                documentFields = new HashMap<>();
                metaFields = new HashMap<>();
                fields.forEach((fieldName, docField) ->
                    (MapperService.META_FIELDS_BEFORE_7DOT8.contains(fieldName) ? metaFields : documentFields).put(fieldName, docField));
            }
        } else {
            metaFields = Collections.emptyMap();
            documentFields = Collections.emptyMap();
        }
    }

    public GetResult(String index, String type, String id, long seqNo, long primaryTerm, long version, boolean exists,
                     BytesReference source, Map<String, DocumentField> documentFields, Map<String, DocumentField> metaFields) {
        this.index = index;
        this.type = type;
        this.id = id;
        this.seqNo = seqNo;
        this.primaryTerm = primaryTerm;
        assert (seqNo == UNASSIGNED_SEQ_NO && primaryTerm == UNASSIGNED_PRIMARY_TERM) || (seqNo >= 0 && primaryTerm >= 1) :
            "seqNo: " + seqNo + " primaryTerm: " + primaryTerm;
        assert exists || (seqNo == UNASSIGNED_SEQ_NO && primaryTerm == UNASSIGNED_PRIMARY_TERM) :
            "doc not found but seqNo/primaryTerm are set";
        this.version = version;
        this.exists = exists;
        this.source = source;
        this.documentFields = documentFields == null ? emptyMap() : documentFields;
        this.metaFields = metaFields == null ? emptyMap() : metaFields;
    }
    ...
}

3. get响应结果 GetResponse

代码语言:javascript
复制
// org.elasticsearch.action.get.GetResponse
public class GetResponse extends ActionResponse implements Iterable<DocumentField>, ToXContentObject {

    GetResult getResult;

    GetResponse(StreamInput in) throws IOException {
        super(in);
        getResult = new GetResult(in);
    }

    public GetResponse(GetResult getResult) {
        this.getResult = getResult;
    }
    ...
}

作者:等你归去来

出处:https://www.cnblogs.com/yougewe/p/14774934.html

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-05-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JAVA烂猪皮 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • handleRequest() 整体调用时序图
  • sendRequest 调用时序图
  • get查找核心时序图
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档