前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ES系列(六):search处理过程实现1框架

ES系列(六):search处理过程实现1框架

作者头像
烂猪皮
发布2021-07-16 10:19:57
3K0
发布2021-07-16 10:19:57
举报
文章被收录于专栏:JAVA烂猪皮JAVA烂猪皮JAVA烂猪皮

走过路过不要错过

点击蓝字关注我们

上一篇文章中,我们看了get在es的实现过程,虽只是一个简单的单条查询,但看起来实现却非常之复杂。纠其原因,是我们围绕了太多外围的东西讲了,而其核心则无外乎三点:1. 定义id对应的机器节点;2. 查找真正的docId;3. 查找docId对应的field信息;

本篇,我们再看另一个es的重要功能:search. 可以说,整个es就是立足于search的,所以,单就这事,足够我们啃上许久许久了。但,我们可以分步来,今日只需聊个大概框架,细节留待日后再说。实际上,平时我们聊事物时,又何偿不是在聊框架类的东西呢。只是有时候钻到牛角尖去,反倒能体现一个人的水平问题了。而针对这一点,则往往会牵出一个人的两个能力点:抓取问题核心的能力;深度理解和思考的能力。

闲话休绪,进入正题:search的处理框架。

1:searchAction框架

我们就以如下请求作为研究来源,即如何发起一个普通的search请求:

# 查找 test 索引中字段 name=ali 的记录
curl -X GET -H 'content-type:application/json' -d '{"query":{"match":{"name":"ali"}}}' http://localhost:9200/test/job/_search

如上备注所说,过滤条件其实就一个 name=ali, 我们可以很容易类比到sql中的表达:

select * from test where name='ali';

看起来问题并不复杂,那么es中又是如何处理该事务的呢?es中的包划分得比较清晰,比如http请求,会先交给rest包下实现处理,内部处理交由action模块处理,启动模块由bootstrap处理等等。总之,这是一个优秀应用必备一个特性:代码清晰易懂。

整个http请求的search入口包,其存放位置如下:

client实例负责许多的请求转发入口,负责与远程或者本机的es节点进行通讯,调度等重要工作。它是在es启动时初始化的一个重要实例,其存放位置如下:

action的内部请求定义包,用于在启动时注册处理器,以及在接收到http请求后,将其统一转发到内部节点处理,其存放位置如下:

最后,来看下 search 的语法包含哪些?这可以在每个具体Action的 routes() 方法中找到:

// org.elasticsearch.rest.action.search.RestSearchAction#routes
    @Override
    public List<Route> routes() {
        return unmodifiableList(asList(
            new Route(GET, "/_search"),
            new Route(POST, "/_search"),
            new Route(GET, "/{index}/_search"),
            new Route(POST, "/{index}/_search"),
            // Deprecated typed endpoints.
            new Route(GET, "/{index}/{type}/_search"),
            new Route(POST, "/{index}/{type}/_search")));
    }

即 search支持不带索引、带索引、带索引带type、GET/POST搜索, 可谓是语法宽松得很呐。

2. search的框架实现

本节我们就来看看实现search功能,es都是如何做的呢?

接到外部请求后,会交给nettyHandler, 然后交给RestController, 然后再找到具体的handler, 然后进行prepare, accept. 具体实现细节可以参考前几篇文章。这里只想说明,最终会交到 RestSearchAction 进行处理。而 RestSearchAction 继承了BaseRestHandler, 会统一走处理流程: prepareAction() -> 参数检查 -> 具体实现调用 ;

2.1. 搜索请求的接入

本部分主要讲解,当接到外部请求后,如果内部的searcher,因是有许多的线程池,以及许多的分布式节点,这前面许多工作并不会执行真正查询。故只会有一些分发起查询工作,参数解析等。我们可以先看个时序图,解其大致:

实现如下:

// org.elasticsearch.rest.BaseRestHandler#handleRequest
    @Override
    public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
        // prepare the request for execution; has the side effect of touching the request parameters
        // 具体的处理实现上下文准备
        final RestChannelConsumer action = prepareRequest(request, client);

        // validate unconsumed params, but we must exclude params used to format the response
        // use a sorted set so the unconsumed parameters appear in a reliable sorted order
        final SortedSet<String> unconsumedParams =
            request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

        // validate the non-response params
        if (!unconsumedParams.isEmpty()) {
            final Set<String> candidateParams = new HashSet<>();
            candidateParams.addAll(request.consumedParams());
            candidateParams.addAll(responseParams());
            throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
        }

        if (request.hasContent() && request.isContentConsumed() == false) {
            throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
        }

        usageCount.increment();
        // execute the action
        action.accept(channel);
    }
    // org.elasticsearch.rest.action.search.RestSearchAction#prepareRequest
    @Override
    public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
        SearchRequest searchRequest = new SearchRequest();
        /*
         * We have to pull out the call to `source().size(size)` because
         * _update_by_query and _delete_by_query uses this same parsing
         * path but sets a different variable when it sees the `size`
         * url parameter.
         *
         * Note that we can't use `searchRequest.source()::size` because
         * `searchRequest.source()` is null right now. We don't have to
         * guard against it being null in the IntConsumer because it can't
         * be null later. If that is confusing to you then you are in good
         * company.
         */
        IntConsumer setSize = size -> searchRequest.source().size(size);
        // 解析参数
        request.withContentOrSourceParamParserOrNull(parser ->
            parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize));
        // 具体的search 业务处理入口
        return channel -> {
            RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
            cancelClient.execute(SearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
        };
    }

这前置工作,两个重点:1. 解析参数;2. 构建业务处理的consumer逻辑;其中,es中大量使用了lamda表达式,大大简化了java编程的繁文缛节,算是为java扳回点颜面。

search作为es中重要且复杂功能,其参数也是超级复杂,要想完全理解各参数,倒真是可以花上几篇的文章好好讲上几讲。不过想稍微多了解点,也可以展开下面的实现,看个大概。

// org.elasticsearch.rest.action.search.RestSearchAction#parseSearchRequest
    /**
     * Parses the rest request on top of the SearchRequest, preserving values that are not overridden by the rest request.
     *
     * @param requestContentParser body of the request to read. This method does not attempt to read the body from the {@code request}
     *        parameter
     * @param setSize how the size url parameter is handled. {@code udpate_by_query} and regular search differ here.
     */
    public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
                                          XContentParser requestContentParser,
                                          NamedWriteableRegistry namedWriteableRegistry,
                                          IntConsumer setSize) throws IOException {

        if (searchRequest.source() == null) {
            searchRequest.source(new SearchSourceBuilder());
        }
        searchRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));
        if (requestContentParser != null) {
            // 将外部请求转换为可读的格式,比如解析出 {"query":{"match":{"xx":"1"}}}
            // 此处相当于词法语法解析,有些难度呢
            searchRequest.source().parseXContent(requestContentParser, true);
        }

        final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
        searchRequest.setBatchedReduceSize(batchedReduceSize);
        if (request.hasParam("pre_filter_shard_size")) {
            searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE));
        }

        if (request.hasParam("max_concurrent_shard_requests")) {
            // only set if we have the parameter since we auto adjust the max concurrency on the coordinator
            // based on the number of nodes in the cluster
            final int maxConcurrentShardRequests = request.paramAsInt("max_concurrent_shard_requests",
                searchRequest.getMaxConcurrentShardRequests());
            searchRequest.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
        }

        if (request.hasParam("allow_partial_search_results")) {
            // only set if we have the parameter passed to override the cluster-level default
            searchRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", null));
        }

        // do not allow 'query_and_fetch' or 'dfs_query_and_fetch' search types
        // from the REST layer. these modes are an internal optimization and should
        // not be specified explicitly by the user.
        String searchType = request.param("search_type");
        if ("query_and_fetch".equals(searchType) ||
                "dfs_query_and_fetch".equals(searchType)) {
            throw new IllegalArgumentException("Unsupported search type [" + searchType + "]");
        } else {
            searchRequest.searchType(searchType);
        }
        // 解析顶层参数备用
        parseSearchSource(searchRequest.source(), request, setSize);
        searchRequest.requestCache(request.paramAsBoolean("request_cache", searchRequest.requestCache()));

        String scroll = request.param("scroll");
        if (scroll != null) {
            // scroll 解析,与时间相关
            searchRequest.scroll(new Scroll(parseTimeValue(scroll, null, "scroll")));
        }

        if (request.hasParam("type")) {
            deprecationLogger.deprecate("search_with_types", TYPES_DEPRECATION_MESSAGE);
            searchRequest.types(Strings.splitStringByCommaToArray(request.param("type")));
        }
        searchRequest.routing(request.param("routing"));
        searchRequest.preference(request.param("preference"));
        searchRequest.indicesOptions(IndicesOptions.fromRequest(request, searchRequest.indicesOptions()));

        checkRestTotalHits(request, searchRequest);

        if (searchRequest.pointInTimeBuilder() != null) {
            preparePointInTime(searchRequest, request, namedWriteableRegistry);
        } else {
            searchRequest.setCcsMinimizeRoundtrips(
                request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));
        }
    }
    // org.elasticsearch.search.builder.SearchSourceBuilder#parseXContent
    /**
     * Parse some xContent into this SearchSourceBuilder, overwriting any values specified in the xContent. Use this if you need to set up
     * different defaults than a regular SearchSourceBuilder would have and use {@link #fromXContent(XContentParser, boolean)} if you have
     * normal defaults.
     *
     * @param parser The xContent parser.
     * @param checkTrailingTokens If true throws a parsing exception when extra tokens are found after the main object.
     */
    public void parseXContent(XContentParser parser, boolean checkTrailingTokens) throws IOException {
        XContentParser.Token token = parser.currentToken();
        String currentFieldName = null;
        if (token != XContentParser.Token.START_OBJECT && (token = parser.nextToken()) != XContentParser.Token.START_OBJECT) {
            throw new ParsingException(parser.getTokenLocation(), "Expected [" + XContentParser.Token.START_OBJECT +
                    "] but found [" + token + "]", parser.getTokenLocation());
        }
        // 循环解析直到结束
        while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
            if (token == XContentParser.Token.FIELD_NAME) {
                currentFieldName = parser.currentName();
            } else if (token.isValue()) {
                if (FROM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    from = parser.intValue();
                } else if (SIZE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    size = parser.intValue();
                } else if (TIMEOUT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    timeout = TimeValue.parseTimeValue(parser.text(), null, TIMEOUT_FIELD.getPreferredName());
                } else if (TERMINATE_AFTER_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    terminateAfter = parser.intValue();
                } else if (MIN_SCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    minScore = parser.floatValue();
                } else if (VERSION_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    version = parser.booleanValue();
                } else if (SEQ_NO_PRIMARY_TERM_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    seqNoAndPrimaryTerm = parser.booleanValue();
                } else if (EXPLAIN_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    explain = parser.booleanValue();
                } else if (TRACK_SCORES_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    trackScores = parser.booleanValue();
                } else if (TRACK_TOTAL_HITS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    if (token == XContentParser.Token.VALUE_BOOLEAN ||
                        (token == XContentParser.Token.VALUE_STRING && Booleans.isBoolean(parser.text()))) {
                        trackTotalHitsUpTo = parser.booleanValue() ? TRACK_TOTAL_HITS_ACCURATE : TRACK_TOTAL_HITS_DISABLED;
                    } else {
                        trackTotalHitsUpTo = parser.intValue();
                    }
                } else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    fetchSourceContext = FetchSourceContext.fromXContent(parser);
                } else if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    storedFieldsContext =
                        StoredFieldsContext.fromXContent(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), parser);
                } else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    sort(parser.text());
                } else if (PROFILE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    profile = parser.booleanValue();
                } else {
                    throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                            parser.getTokenLocation());
                }
            } else if (token == XContentParser.Token.START_OBJECT) {
                // 解析query 参数
                if (QUERY_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    queryBuilder = parseInnerQueryBuilder(parser);
                } else if (POST_FILTER_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    postQueryBuilder = parseInnerQueryBuilder(parser);
                } else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    fetchSourceContext = FetchSourceContext.fromXContent(parser);
                } else if (SCRIPT_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    scriptFields = new ArrayList<>();
                    while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                        scriptFields.add(new ScriptField(parser));
                    }
                } else if (INDICES_BOOST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    deprecationLogger.deprecate("indices_boost_object_format",
                        "Object format in indices_boost is deprecated, please use array format instead");
                    while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                        if (token == XContentParser.Token.FIELD_NAME) {
                            currentFieldName = parser.currentName();
                        } else if (token.isValue()) {
                            indexBoosts.add(new IndexBoost(currentFieldName, parser.floatValue()));
                        } else {
                            throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token +
                                " in [" + currentFieldName + "].", parser.getTokenLocation());
                        }
                    }
                } else if (AGGREGATIONS_FIELD.match(currentFieldName, parser.getDeprecationHandler())
                        || AGGS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    aggregations = AggregatorFactories.parseAggregators(parser);
                } else if (HIGHLIGHT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    highlightBuilder = HighlightBuilder.fromXContent(parser);
                } else if (SUGGEST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    suggestBuilder = SuggestBuilder.fromXContent(parser);
                } else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    sorts = new ArrayList<>(SortBuilder.fromXContent(parser));
                } else if (RESCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    rescoreBuilders = new ArrayList<>();
                    rescoreBuilders.add(RescorerBuilder.parseFromXContent(parser));
                } else if (EXT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    extBuilders = new ArrayList<>();
                    String extSectionName = null;
                    while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                        if (token == XContentParser.Token.FIELD_NAME) {
                            extSectionName = parser.currentName();
                        } else {
                            SearchExtBuilder searchExtBuilder = parser.namedObject(SearchExtBuilder.class, extSectionName, null);
                            if (searchExtBuilder.getWriteableName().equals(extSectionName) == false) {
                                throw new IllegalStateException("The parsed [" + searchExtBuilder.getClass().getName() + "] object has a "
                                        + "different writeable name compared to the name of the section that it was parsed from: found ["
                                        + searchExtBuilder.getWriteableName() + "] expected [" + extSectionName + "]");
                            }
                            extBuilders.add(searchExtBuilder);
                        }
                    }
                } else if (SLICE.match(currentFieldName, parser.getDeprecationHandler())) {
                    sliceBuilder = SliceBuilder.fromXContent(parser);
                } else if (COLLAPSE.match(currentFieldName, parser.getDeprecationHandler())) {
                    collapse = CollapseBuilder.fromXContent(parser);
                } else if (POINT_IN_TIME.match(currentFieldName, parser.getDeprecationHandler())) {
                    pointInTimeBuilder = PointInTimeBuilder.fromXContent(parser);
                } else if (RUNTIME_MAPPINGS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    runtimeMappings = parser.map();
                } else {
                    throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                            parser.getTokenLocation());
                }
            } else if (token == XContentParser.Token.START_ARRAY) {
                if (STORED_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    storedFieldsContext = StoredFieldsContext.fromXContent(STORED_FIELDS_FIELD.getPreferredName(), parser);
                } else if (DOCVALUE_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    docValueFields = new ArrayList<>();
                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                        docValueFields.add(FieldAndFormat.fromXContent(parser));
                    }
                } else if (FETCH_FIELDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    fetchFields = new ArrayList<>();
                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                        fetchFields.add(FieldAndFormat.fromXContent(parser));
                    }
                } else if (INDICES_BOOST_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                        indexBoosts.add(new IndexBoost(parser));
                    }
                } else if (SORT_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    sorts = new ArrayList<>(SortBuilder.fromXContent(parser));
                } else if (RESCORE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    rescoreBuilders = new ArrayList<>();
                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                        rescoreBuilders.add(RescorerBuilder.parseFromXContent(parser));
                    }
                } else if (STATS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    stats = new ArrayList<>();
                    while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                        if (token == XContentParser.Token.VALUE_STRING) {
                            stats.add(parser.text());
                        } else {
                            throw new ParsingException(parser.getTokenLocation(), "Expected [" + XContentParser.Token.VALUE_STRING +
                                    "] in [" + currentFieldName + "] but found [" + token + "]", parser.getTokenLocation());
                        }
                    }
                } else if (_SOURCE_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
                    fetchSourceContext = FetchSourceContext.fromXContent(parser);
                } else if (SEARCH_AFTER.match(currentFieldName, parser.getDeprecationHandler())) {
                    searchAfterBuilder = SearchAfterBuilder.fromXContent(parser);
                } else {
                    throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                            parser.getTokenLocation());
                }
            } else {
                throw new ParsingException(parser.getTokenLocation(), "Unknown key for a " + token + " in [" + currentFieldName + "].",
                        parser.getTokenLocation());
            }
        }
        // 解析完成,token被使用完
        if (checkTrailingTokens) {
            token = parser.nextToken();
            if (token != null) {
                throw new ParsingException(parser.getTokenLocation(), "Unexpected token [" + token + "] found after the main object.");
            }
        }
    }
    // org.elasticsearch.index.query.AbstractQueryBuilder#parseInnerQueryBuilder
    /**
     * Parses a query excluding the query element that wraps it
     */
    public static QueryBuilder parseInnerQueryBuilder(XContentParser parser) throws IOException {
        if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
            if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
                throw new ParsingException(parser.getTokenLocation(), "[_na] query malformed, must start with start_object");
            }
        }
        if (parser.nextToken() == XContentParser.Token.END_OBJECT) {
            // we encountered '{}' for a query clause, it used to be supported, deprecated in 5.0 and removed in 6.0
            throw new IllegalArgumentException("query malformed, empty clause found at [" + parser.getTokenLocation() +"]");
        }
        if (parser.currentToken() != XContentParser.Token.FIELD_NAME) {
            throw new ParsingException(parser.getTokenLocation(), "[_na] query malformed, no field after start_object");
        }
        String queryName = parser.currentName();
        // move to the next START_OBJECT
        if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
            throw new ParsingException(parser.getTokenLocation(), "[" + queryName + "] query malformed, no start_object after query name");
        }
        QueryBuilder result;
        try {
            result = parser.namedObject(QueryBuilder.class, queryName, null);
        } catch (NamedObjectNotFoundException e) {
            String message = String.format(Locale.ROOT, "unknown query [%s]%s", queryName,
                    SuggestingErrorOnUnknown.suggest(queryName, e.getCandidates()));
            throw new ParsingException(new XContentLocation(e.getLineNumber(), e.getColumnNumber()), message, e);
        }
        //end_object of the specific query (e.g. match, multi_match etc.) element
        if (parser.currentToken() != XContentParser.Token.END_OBJECT) {
            throw new ParsingException(parser.getTokenLocation(),
                    "[" + queryName + "] malformed query, expected [END_OBJECT] but found [" + parser.currentToken() + "]");
        }
        //end_object of the query object
        if (parser.nextToken() != XContentParser.Token.END_OBJECT) {
            throw new ParsingException(parser.getTokenLocation(),
                    "[" + queryName + "] malformed query, expected [END_OBJECT] but found [" + parser.currentToken() + "]");
        }
        return result;
    }
    // org.elasticsearch.rest.action.search.RestSearchAction#parseSearchSource
    /**
     * Parses the rest request on top of the SearchSourceBuilder, preserving
     * values that are not overridden by the rest request.
     */
    private static void parseSearchSource(final SearchSourceBuilder searchSourceBuilder, RestRequest request, IntConsumer setSize) {
        // ?q=xx 格式的搜索
        QueryBuilder queryBuilder = RestActions.urlParamsToQueryBuilder(request);
        if (queryBuilder != null) {
            searchSourceBuilder.query(queryBuilder);
        }

        int from = request.paramAsInt("from", -1);
        if (from != -1) {
            searchSourceBuilder.from(from);
        }
        int size = request.paramAsInt("size", -1);
        if (size != -1) {
            setSize.accept(size);
        }

        if (request.hasParam("explain")) {
            searchSourceBuilder.explain(request.paramAsBoolean("explain", null));
        }
        if (request.hasParam("version")) {
            searchSourceBuilder.version(request.paramAsBoolean("version", null));
        }
        if (request.hasParam("seq_no_primary_term")) {
            searchSourceBuilder.seqNoAndPrimaryTerm(request.paramAsBoolean("seq_no_primary_term", null));
        }
        if (request.hasParam("timeout")) {
            searchSourceBuilder.timeout(request.paramAsTime("timeout", null));
        }
        if (request.hasParam("terminate_after")) {
            int terminateAfter = request.paramAsInt("terminate_after",
                    SearchContext.DEFAULT_TERMINATE_AFTER);
            if (terminateAfter < 0) {
                throw new IllegalArgumentException("terminateAfter must be > 0");
            } else if (terminateAfter > 0) {
                searchSourceBuilder.terminateAfter(terminateAfter);
            }
        }

        StoredFieldsContext storedFieldsContext =
            StoredFieldsContext.fromRestRequest(SearchSourceBuilder.STORED_FIELDS_FIELD.getPreferredName(), request);
        if (storedFieldsContext != null) {
            searchSourceBuilder.storedFields(storedFieldsContext);
        }
        String sDocValueFields = request.param("docvalue_fields");
        if (sDocValueFields != null) {
            if (Strings.hasText(sDocValueFields)) {
                String[] sFields = Strings.splitStringByCommaToArray(sDocValueFields);
                for (String field : sFields) {
                    searchSourceBuilder.docValueField(field, null);
                }
            }
        }
        FetchSourceContext fetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
        if (fetchSourceContext != null) {
            searchSourceBuilder.fetchSource(fetchSourceContext);
        }

        if (request.hasParam("track_scores")) {
            searchSourceBuilder.trackScores(request.paramAsBoolean("track_scores", false));
        }


        if (request.hasParam("track_total_hits")) {
            if (Booleans.isBoolean(request.param("track_total_hits"))) {
                searchSourceBuilder.trackTotalHits(
                    request.paramAsBoolean("track_total_hits", true)
                );
            } else {
                searchSourceBuilder.trackTotalHitsUpTo(
                    request.paramAsInt("track_total_hits", SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO)
                );
            }
        }

        String sSorts = request.param("sort");
        if (sSorts != null) {
            String[] sorts = Strings.splitStringByCommaToArray(sSorts);
            for (String sort : sorts) {
                int delimiter = sort.lastIndexOf(":");
                if (delimiter != -1) {
                    String sortField = sort.substring(0, delimiter);
                    String reverse = sort.substring(delimiter + 1);
                    if ("asc".equals(reverse)) {
                        searchSourceBuilder.sort(sortField, SortOrder.ASC);
                    } else if ("desc".equals(reverse)) {
                        searchSourceBuilder.sort(sortField, SortOrder.DESC);
                    }
                } else {
                    searchSourceBuilder.sort(sort);
                }
            }
        }

        String sStats = request.param("stats");
        if (sStats != null) {
            searchSourceBuilder.stats(Arrays.asList(Strings.splitStringByCommaToArray(sStats)));
        }

        String suggestField = request.param("suggest_field");
        if (suggestField != null) {
            String suggestText = request.param("suggest_text", request.param("q"));
            int suggestSize = request.paramAsInt("suggest_size", 5);
            String suggestMode = request.param("suggest_mode");
            searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(suggestField,
                    termSuggestion(suggestField)
                        .text(suggestText).size(suggestSize)
                        .suggestMode(SuggestMode.resolve(suggestMode))));
        }
    }
    // org.elasticsearch.rest.action.RestActions#urlParamsToQueryBuilder
    public static QueryBuilder urlParamsToQueryBuilder(RestRequest request) {
        String queryString = request.param("q");
        if (queryString == null) {
            return null;
        }
        QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery(queryString);
        queryBuilder.defaultField(request.param("df"));
        queryBuilder.analyzer(request.param("analyzer"));
        queryBuilder.analyzeWildcard(request.paramAsBoolean("analyze_wildcard", false));
        queryBuilder.lenient(request.paramAsBoolean("lenient", null));
        String defaultOperator = request.param("default_operator");
        if (defaultOperator != null) {
            queryBuilder.defaultOperator(Operator.fromString(defaultOperator));
        }
        return queryBuilder;
    }
2.2. search请求的分发

最终执行search时,由 RestCancellableNodeClient 进行execute, 并通过 SearchAction.INSTANCE 查找到处理器 TransportSearchAction . 但此任务,仍是在做前期工作,当前client即会作为协调节点,它知当前语义如何,需要将请求拆解、分发给各节点,或者自行使用异步线程处理。

// org.elasticsearch.action.search.TransportSearchAction#doExecute
    @Override
    protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        executeRequest(task, searchRequest, this::searchAsyncAction, listener);
    }
    private void executeRequest(Task task, SearchRequest searchRequest,
                                SearchAsyncActionProvider searchAsyncActionProvider, ActionListener<SearchResponse> listener) {
        final long relativeStartNanos = System.nanoTime();
        final SearchTimeProvider timeProvider =
            new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
        ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
            if (source != searchRequest.source()) {
                // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
                // situations when source is rewritten to null due to a bug
                searchRequest.source(source);
            }
            // 重点逻辑
            final ClusterState clusterState = clusterService.state();
            final SearchContextId searchContext;
            final Map<String, OriginalIndices> remoteClusterIndices;
            if (searchRequest.pointInTimeBuilder() != null) {
                searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
                remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
            } else {
                searchContext = null;
                // 获取远程索引
                remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),
                    searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState));
            }
            OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
            if (remoteClusterIndices.isEmpty()) {
                // 本地执行 搜索
                executeLocalSearch(
                    task, timeProvider, searchRequest, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
            } else {
                if (shouldMinimizeRoundtrips(searchRequest)) {
                    final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
                    ccsRemoteReduce(parentTaskId, searchRequest, localIndices, remoteClusterIndices, timeProvider,
                        searchService.aggReduceContextBuilder(searchRequest),
                        remoteClusterService, threadPool, listener,
                        (r, l) -> executeLocalSearch(
                            task, timeProvider, r, localIndices, clusterState, l, searchContext, searchAsyncActionProvider));
                } else {
                    AtomicInteger skippedClusters = new AtomicInteger(0);
                    // 更多的是需要收集许多shard数据
                    collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
                        skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
                        ActionListener.wrap(
                            searchShardsResponses -> {
                                final BiFunction<String, String, DiscoveryNode> clusterNodeLookup =
                                    getRemoteClusterNodeLookup(searchShardsResponses);
                                final Map<String, AliasFilter> remoteAliasFilters;
                                final List<SearchShardIterator> remoteShardIterators;
                                if (searchContext != null) {
                                    remoteAliasFilters = searchContext.aliasFilter();
                                    remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
                                        searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
                                } else {
                                    remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
                                    remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
                                        remoteAliasFilters);
                                }
                                int localClusters = localIndices == null ? 0 : 1;
                                int totalClusters = remoteClusterIndices.size() + localClusters;
                                int successfulClusters = searchShardsResponses.size() + localClusters;
                                executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators,
                                    clusterNodeLookup, clusterState, remoteAliasFilters, listener,
                                    new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
                                    searchContext, searchAsyncActionProvider);
                            },
                            listener::onFailure));
                }
            }
        }, listener::onFailure);
        // 调用 rewriteListener, 此处source 代表所有输入的搜索条件参数 (json)
        if (searchRequest.source() == null) {
            rewriteListener.onResponse(searchRequest.source());
        } else {
            Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
                rewriteListener);
        }
    }
    // org.elasticsearch.index.query.Rewriteable#rewriteAndFetch
    /**
     * Rewrites the given rewriteable and fetches pending async tasks for each round before rewriting again.
     */
    static <T extends Rewriteable<T>> void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener<T> rewriteResponse) {
        rewriteAndFetch(original, context, rewriteResponse, 0);
    }
    /**
     * Rewrites the given rewriteable and fetches pending async tasks for each round before rewriting again.
     */
    static <T extends Rewriteable<T>> void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener<T>
        rewriteResponse, int iteration) {
        T builder = original;
        try {
            for (T rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder;
                 rewrittenBuilder = builder.rewrite(context)) {
                builder = rewrittenBuilder;
                if (iteration++ >= MAX_REWRITE_ROUNDS) {
                    // this is some protection against user provided queries if they don't obey the contract of rewrite we allow 16 rounds
                    // and then we fail to prevent infinite loops
                    throw new IllegalStateException("too many rewrite rounds, rewriteable might return new objects even if they are not " +
                        "rewritten");
                }
                if (context.hasAsyncActions()) {
                    T finalBuilder = builder;
                    final int currentIterationNumber = iteration;
                    context.executeAsyncActions(ActionListener.wrap(n -> rewriteAndFetch(finalBuilder, context, rewriteResponse,
                        currentIterationNumber), rewriteResponse::onFailure));
                    return;
                }
            }
            rewriteResponse.onResponse(builder);
        } catch (IOException|IllegalArgumentException|ParsingException ex) {
            rewriteResponse.onFailure(ex);
        }
    }

以上就是大致的搜索过程接入,看着着实有点累吧。先来看看单节点搜索实现框架:

// org.elasticsearch.action.search.TransportSearchAction#executeLocalSearch
    private void executeLocalSearch(Task task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
                                    ClusterState clusterState, ActionListener<SearchResponse> listener,
                                    SearchContextId searchContext,
                                    SearchAsyncActionProvider searchAsyncActionProvider) {
        executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
            (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY,
            searchContext, searchAsyncActionProvider);
    }
    // org.elasticsearch.action.search.TransportSearchAction#executeSearch
    private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
                               OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators,
                               BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
                               Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,
                               SearchResponse.Clusters clusters, @Nullable SearchContextId searchContext,
                               SearchAsyncActionProvider searchAsyncActionProvider) {

        clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);

        // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
        // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
        // of just for the _search api
        final List<SearchShardIterator> localShardIterators;
        final Map<String, AliasFilter> aliasFilter;

        final String[] concreteLocalIndices;
        if (searchContext != null) {
            assert searchRequest.pointInTimeBuilder() != null;
            aliasFilter = searchContext.aliasFilter();
            concreteLocalIndices = localIndices == null ? new String[0] : localIndices.indices();
            localShardIterators = getLocalLocalShardsIteratorFromPointInTime(clusterState, localIndices,
                searchRequest.getLocalClusterAlias(), searchContext, searchRequest.pointInTimeBuilder().getKeepAlive());
        } else {
            // 解析索引
            final Index[] indices = resolveLocalIndices(localIndices, clusterState, timeProvider);
            // 解析路由
            Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
                searchRequest.indices());
            routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
            concreteLocalIndices = new String[indices.length];
            for (int i = 0; i < indices.length; i++) {
                concreteLocalIndices[i] = indices[i].getName();
            }
            Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
            GroupShardsIterator<ShardIterator> localShardRoutings = clusterService.operationRouting().searchShards(clusterState,
                concreteLocalIndices, routingMap, searchRequest.preference(),
                searchService.getResponseCollectorService(), nodeSearchCounts);
            localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
                .map(it -> new SearchShardIterator(
                    searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
                .collect(Collectors.toList());
            // 别名过滤器
            aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
        }
        // 分片迭代器
        final GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators);
        // 检查shard数量是否超限
        failIfOverShardCountLimit(clusterService, shardIterators.size());

        Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);

        // optimize search type for cases where there is only one shard group to search on
        // 搜索优化:一个shard时,查找完成一个立即返回
        if (shardIterators.size() == 1) {
            // if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
            searchRequest.searchType(QUERY_THEN_FETCH);
        }
        if (searchRequest.allowPartialSearchResults() == null) {
            // No user preference defined in search request - apply cluster service default
            searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
        }
        if (searchRequest.isSuggestOnly()) {
            // disable request cache if we have only suggest
            searchRequest.requestCache(false);
            switch (searchRequest.searchType()) {
                case DFS_QUERY_THEN_FETCH:
                    // convert to Q_T_F if we have only suggest
                    searchRequest.searchType(QUERY_THEN_FETCH);
                    break;
            }
        }
        final DiscoveryNodes nodes = clusterState.nodes();
        // 索引搜索连接管理
        BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
            nodes::get, remoteConnections, searchTransportService::getConnection);
        // 获取专门用于搜索的线程池
        final Executor asyncSearchExecutor = asyncSearchExecutor(concreteLocalIndices, clusterState);
        final boolean preFilterSearchShards = shouldPreFilterSearchShards(clusterState, searchRequest, concreteLocalIndices,
            localShardIterators.size() + remoteShardIterators.size());
        // 构造异步请求处理器,开启search
        searchAsyncActionProvider.asyncSearchAction(
            task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState,
            Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener,
            preFilterSearchShards, threadPool, clusters).start();
    }

主要就是检查索引、分片、线程池等,搞好之后提到异步执行去了。而分布式系统高性能的秘诀就是在这里,难点也是在这里,难以排查跟踪。

下面我们简单看看多节点搜索时,需要收集结果,其过程大致如下:

// org.elasticsearch.action.search.TransportSearchAction#collectSearchShards
    static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters,
                                    Map<String, OriginalIndices> remoteIndicesByCluster, RemoteClusterService remoteClusterService,
                                    ThreadPool threadPool, ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
        final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
        final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
        final AtomicReference<Exception> exceptions = new AtomicReference<>();
        for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
            final String clusterAlias = entry.getKey();
            boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
            Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
            final String[] indices = entry.getValue().indices();
            ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
                .indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
            clusterClient.admin().cluster().searchShards(searchShardsRequest,
                new CCSActionListener<ClusterSearchShardsResponse, Map<String, ClusterSearchShardsResponse>>(
                    clusterAlias, skipUnavailable, responsesCountDown, skippedClusters, exceptions, listener) {
                    @Override
                    void innerOnResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
                        searchShardsResponses.put(clusterAlias, clusterSearchShardsResponse);
                    }

                    @Override
                    Map<String, ClusterSearchShardsResponse> createFinalResponse() {
                        return searchShardsResponses;
                    }
                }
            );
        }
    }

然后其中有许多处理shard, 索引的细节,感兴趣的自行深入。

// org.elasticsearch.cluster.metadata.IndexNameExpressionResolver#resolveSearchRouting
    /**
     * Resolves the search routing if in the expression aliases are used. If expressions point to concrete indices
     * or aliases with no routing defined the specified routing is used.
     *
     * @return routing values grouped by concrete index
     */
    public Map<String, Set<String>> resolveSearchRouting(ClusterState state, @Nullable String routing, String... expressions) {
        List<String> resolvedExpressions = expressions != null ? Arrays.asList(expressions) : Collections.emptyList();
        Context context = new Context(state, IndicesOptions.lenientExpandOpen(), false, false, true, isSystemIndexAccessAllowed());
        for (ExpressionResolver expressionResolver : expressionResolvers) {
            resolvedExpressions = expressionResolver.resolve(context, resolvedExpressions);
        }

        // TODO: it appears that this can never be true?
        if (isAllIndices(resolvedExpressions)) {
            return resolveSearchRoutingAllIndices(state.metadata(), routing);
        }

        Map<String, Set<String>> routings = null;
        Set<String> paramRouting = null;
        // List of indices that don't require any routing
        Set<String> norouting = new HashSet<>();
        if (routing != null) {
            paramRouting = Sets.newHashSet(Strings.splitStringByCommaToArray(routing));
        }

        for (String expression : resolvedExpressions) {
            IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(expression);
            if (indexAbstraction != null && indexAbstraction.getType() == IndexAbstraction.Type.ALIAS) {
                IndexAbstraction.Alias alias = (IndexAbstraction.Alias) indexAbstraction;
                for (Tuple<String, AliasMetadata> item : alias.getConcreteIndexAndAliasMetadatas()) {
                    String concreteIndex = item.v1();
                    AliasMetadata aliasMetadata = item.v2();
                    if (!norouting.contains(concreteIndex)) {
                        if (!aliasMetadata.searchRoutingValues().isEmpty()) {
                            // Routing alias
                            if (routings == null) {
                                routings = new HashMap<>();
                            }
                            Set<String> r = routings.get(concreteIndex);
                            if (r == null) {
                                r = new HashSet<>();
                                routings.put(concreteIndex, r);
                            }
                            r.addAll(aliasMetadata.searchRoutingValues());
                            if (paramRouting != null) {
                                r.retainAll(paramRouting);
                            }
                            if (r.isEmpty()) {
                                routings.remove(concreteIndex);
                            }
                        } else {
                            // Non-routing alias
                            if (!norouting.contains(concreteIndex)) {
                                norouting.add(concreteIndex);
                                if (paramRouting != null) {
                                    Set<String> r = new HashSet<>(paramRouting);
                                    if (routings == null) {
                                        routings = new HashMap<>();
                                    }
                                    routings.put(concreteIndex, r);
                                } else {
                                    if (routings != null) {
                                        routings.remove(concreteIndex);
                                    }
                                }
                            }
                        }
                    }
                }
            } else {
                // Index
                if (!norouting.contains(expression)) {
                    norouting.add(expression);
                    if (paramRouting != null) {
                        Set<String> r = new HashSet<>(paramRouting);
                        if (routings == null) {
                            routings = new HashMap<>();
                        }
                        routings.put(expression, r);
                    } else {
                        if (routings != null) {
                            routings.remove(expression);
                        }
                    }
                }
            }

        }
        if (routings == null || routings.isEmpty()) {
            return null;
        }
        return routings;
    }
    // org.elasticsearch.cluster.routing.OperationRouting#searchShards
    public GroupShardsIterator<ShardIterator> searchShards(ClusterState clusterState,
                                                           String[] concreteIndices,
                                                           @Nullable Map<String, Set<String>> routing,
                                                           @Nullable String preference,
                                                           @Nullable ResponseCollectorService collectorService,
                                                           @Nullable Map<String, Long> nodeCounts) {
        final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
        final Set<ShardIterator> set = new HashSet<>(shards.size());
        for (IndexShardRoutingTable shard : shards) {
            ShardIterator iterator = preferenceActiveShardIterator(shard,
                    clusterState.nodes().getLocalNodeId(), clusterState.nodes(), preference, collectorService, nodeCounts);
            if (iterator != null) {
                set.add(iterator);
            }
        }
        return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));
    }
    // org.elasticsearch.cluster.routing.OperationRouting#computeTargetedShards
    private Set<IndexShardRoutingTable> computeTargetedShards(ClusterState clusterState, String[] concreteIndices,
                                                              @Nullable Map<String, Set<String>> routing) {
        routing = routing == null ? EMPTY_ROUTING : routing; // just use an empty map
        final Set<IndexShardRoutingTable> set = new HashSet<>();
        // we use set here and not list since we might get duplicates
        for (String index : concreteIndices) {
            final IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
            final IndexMetadata indexMetadata = indexMetadata(clusterState, index);
            final Set<String> effectiveRouting = routing.get(index);
            if (effectiveRouting != null) {
                for (String r : effectiveRouting) {
                    final int routingPartitionSize = indexMetadata.getRoutingPartitionSize();
                    for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset++) {
                        set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetadata, r, partitionOffset)));
                    }
                }
            } else {
                for (IndexShardRoutingTable indexShard : indexRouting) {
                    set.add(indexShard);
                }
            }
        }
        return set;
    }

之后便使用 asyncExecutor执行start() 开启搜索分发。

// org.elasticsearch.action.search.TransportSearchAction#buildConnectionLookup
    static BiFunction<String, String, Transport.Connection> buildConnectionLookup(String requestClusterAlias,
                                                              Function<String, DiscoveryNode> localNodes,
                                                              BiFunction<String, String, DiscoveryNode> remoteNodes,
                                                              BiFunction<String, DiscoveryNode, Transport.Connection> nodeToConnection) {
        return (clusterAlias, nodeId) -> {
            final DiscoveryNode discoveryNode;
            final boolean remoteCluster;
            if (clusterAlias == null || requestClusterAlias != null) {
                assert requestClusterAlias == null || requestClusterAlias.equals(clusterAlias);
                discoveryNode = localNodes.apply(nodeId);
                remoteCluster = false;
            } else {
                discoveryNode = remoteNodes.apply(clusterAlias, nodeId);
                remoteCluster = true;
            }
            if (discoveryNode == null) {
                throw new IllegalStateException("no node found for id: " + nodeId);
            }
            return nodeToConnection.apply(remoteCluster ? clusterAlias : null, discoveryNode);
        };
    }
    // org.elasticsearch.action.search.TransportSearchAction#asyncSearchExecutor
    Executor asyncSearchExecutor(final String[] indices, final ClusterState clusterState) {
        final boolean onlySystemIndices = Arrays.stream(indices)
            .allMatch(index -> {
                final IndexMetadata indexMetadata = clusterState.metadata().index(index);
                return indexMetadata != null && indexMetadata.isSystem();
            });
        return onlySystemIndices ? threadPool.executor(ThreadPool.Names.SYSTEM_READ) : threadPool.executor(ThreadPool.Names.SEARCH);
    }


    // org.elasticsearch.action.search.AbstractSearchAsyncAction#start
    /**
     * This is the main entry point for a search. This method starts the search execution of the initial phase.
     */
    public final void start() {
        if (getNumShards() == 0) {
            //no search shards to search on, bail with empty response
            //(it happens with search across _all with no indices around and consistent with broadcast operations)
            int trackTotalHitsUpTo = request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :
                request.source().trackTotalHitsUpTo() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO :
                    request.source().trackTotalHitsUpTo();
            // total hits is null in the response if the tracking of total hits is disabled
            boolean withTotalHits = trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED;
            listener.onResponse(new SearchResponse(InternalSearchResponse.empty(withTotalHits), null, 0, 0, 0, buildTookInMillis(),
                ShardSearchFailure.EMPTY_ARRAY, clusters, null));
            return;
        }
        executePhase(this);
    }
    // org.elasticsearch.action.search.AbstractSearchAsyncAction#executePhase
    private void executePhase(SearchPhase phase) {
        try {
            phase.run();
        } catch (Exception e) {
            if (logger.isDebugEnabled()) {
                logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
            }
            onPhaseFailure(phase, "", e);
        }
    }

最终 search 会以异步run的形式到达,并迭代shard运行。

// org.elasticsearch.action.search.AbstractSearchAsyncAction#run
    @Override
    public final void run() {
        for (final SearchShardIterator iterator : toSkipShardsIts) {
            assert iterator.skip();
            skipShard(iterator);
        }
        if (shardsIts.size() > 0) {
            assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults";
            if (request.allowPartialSearchResults() == false) {
                final StringBuilder missingShards = new StringBuilder();
                // Fail-fast verification of all shards being available
                for (int index = 0; index < shardsIts.size(); index++) {
                    final SearchShardIterator shardRoutings = shardsIts.get(index);
                    if (shardRoutings.size() == 0) {
                        if(missingShards.length() > 0){
                            missingShards.append(", ");
                        }
                        missingShards.append(shardRoutings.shardId());
                    }
                }
                if (missingShards.length() > 0) {
                    //Status red - shard is missing all copies and would produce partial results for an index search
                    final String msg = "Search rejected due to missing shards ["+ missingShards +
                        "]. Consider using `allow_partial_search_results` setting to bypass this error.";
                    throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY);
                }
            }
            // 多个shard运行搜索
            for (int i = 0; i < shardsIts.size(); i++) {
                final SearchShardIterator shardRoutings = shardsIts.get(i);
                assert shardRoutings.skip() == false;
                assert shardItIndexMap.containsKey(shardRoutings);
                int shardIndex = shardItIndexMap.get(shardRoutings);
                performPhaseOnShard(shardIndex, shardRoutings, shardRoutings.nextOrNull());
            }
        }
    }
    // org.elasticsearch.action.search.AbstractSearchAsyncAction#performPhaseOnShard
    protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
        /*
         * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
         * same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we
         * continue on the same thread in the case that we never went async and this happens repeatedly we will end up recursing deeply and
         * could stack overflow. To prevent this, we fork if we are called back on the same thread that execution started on and otherwise
         * we can continue (cf. InitialSearchPhase#maybeFork).
         */
        if (shard == null) {
            SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(),
                shardIt.getClusterAlias(), shardIt.getOriginalIndices());
            fork(() -> onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
        } else {
            final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
                pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
                : null;
            Runnable r = () -> {
                final Thread thread = Thread.currentThread();
                try {
                    // 在单shard上搜索
                    executePhaseOnShard(shardIt, shard,
                        new SearchActionListener<Result>(shard, shardIndex) {
                            @Override
                            public void innerOnResponse(Result result) {
                                try {
                                    onShardResult(result, shardIt);
                                } catch (Exception exc) {
                                    onShardFailure(shardIndex, shard, shardIt, exc);
                                } finally {
                                    executeNext(pendingExecutions, thread);
                                }
                            }

                            @Override
                            public void onFailure(Exception t) {
                                try {
                                    onShardFailure(shardIndex, shard, shardIt, t);
                                } finally {
                                    executeNext(pendingExecutions, thread);
                                }
                            }
                        });
                } catch (final Exception e) {
                    try {
                        /*
                         * It is possible to run into connection exceptions here because we are getting the connection early and might
                         * run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
                         */
                        fork(() -> onShardFailure(shardIndex, shard, shardIt, e));
                    } finally {
                        executeNext(pendingExecutions, thread);
                    }
                }
            };
            // 排队或立即运行
            if (throttleConcurrentRequests) {
                pendingExecutions.tryRun(r);
            } else {
                r.run();
            }
        }
    }
    // org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction#executePhaseOnShard
    protected void executePhaseOnShard(final SearchShardIterator shardIt,
                                       final SearchShardTarget shard,
                                       final SearchActionListener<SearchPhaseResult> listener) {
        // 构造search请求
        ShardSearchRequest request = rewriteShardSearchRequest(
                super.buildShardSearchRequest(shardIt, listener.requestIndex));
           // 发送 search 请求到对应节点,而对于本地节点则mock一个connection, 直接执行本地搜索
        getSearchTransport()
            .sendExecuteQuery(
                getConnection(shard.getClusterAlias(), shard.getNodeId()), 
                request, 
                getTask(), 
                listener);
    }
    // org.elasticsearch.action.search.AbstractSearchAsyncAction#buildShardSearchRequest
    @Override
    public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt, int shardIndex) {
        AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
        assert filter != null;
        float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
        ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), shardIndex,
            getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(),
            shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
        // if we already received a search result we can inform the shard that it
        // can return a null response if the request rewrites to match none rather
        // than creating an empty response in the search thread pool.
        // Note that, we have to disable this shortcut for queries that create a context (scroll and search context).
        shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && shardRequest.scroll() == null);
        return shardRequest;
    }

发送请求过程之前我们细细见识过,此处可忽略。

// org.elasticsearch.action.search.SearchTransportService#sendExecuteQuery
    public void sendExecuteQuery(Transport.Connection connection, final ShardSearchRequest request, SearchTask task,
                                 final SearchActionListener<SearchPhaseResult> listener) {
        // we optimize this and expect a QueryFetchSearchResult if we only have a single shard in the search request
        // this used to be the QUERY_AND_FETCH which doesn't exist anymore.
        final boolean fetchDocuments = request.numberOfShards() == 1;
        Writeable.Reader<SearchPhaseResult> reader = fetchDocuments ? QueryFetchSearchResult::new : QuerySearchResult::new;

        final ActionListener handler = responseWrapper.apply(connection, listener);
        transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
                new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId()));
    }
    // org.elasticsearch.transport.TransportService#sendChildRequest
    public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
                                                               final TransportRequest request, final Task parentTask,
                                                               final TransportResponseHandler<T> handler) {
        sendChildRequest(connection, action, request, parentTask, TransportRequestOptions.EMPTY, handler);
    }
    // org.elasticsearch.transport.TransportService#sendChildRequest
    public <T extends TransportResponse> void sendChildRequest(final Transport.Connection connection, final String action,
                                                               final TransportRequest request, final Task parentTask,
                                                               final TransportRequestOptions options,
                                                               final TransportResponseHandler<T> handler) {
        request.setParentTask(localNode.getId(), parentTask.getId());
        sendRequest(connection, action, request, options, handler);
    }
    // org.elasticsearch.transport.TransportService#sendRequest
    /**
     * Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
     *
     * @param connection the connection to send the request on
     * @param action     the name of the action
     * @param request    the request
     * @param options    the options for this request
     * @param handler    the response handler
     * @param <T>        the type of the transport response
     */
    public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
                                                                final TransportRequest request,
                                                                final TransportRequestOptions options,
                                                                final TransportResponseHandler<T> handler) {
        try {
            final TransportResponseHandler<T> delegate;
            if (request.getParentTask().isSet()) {
                // If the connection is a proxy connection, then we will create a cancellable proxy task on the proxy node and an actual
                // child task on the target node of the remote cluster.
                //  ----> a parent task on the local cluster
                //        |
                //         ----> a proxy task on the proxy node on the remote cluster
                //               |
                //                ----> an actual child task on the target node on the remote cluster
                // To cancel the child task on the remote cluster, we must send a cancel request to the proxy node instead of the target
                // node as the parent task of the child task is the proxy task not the parent task on the local cluster. Hence, here we
                // unwrap the connection and keep track of the connection to the proxy node instead of the proxy connection.
                final Transport.Connection unwrappedConn = unwrapConnection(connection);
                final Releasable unregisterChildNode = taskManager.registerChildConnection(request.getParentTask().getId(), unwrappedConn);
                delegate = new TransportResponseHandler<T>() {
                    @Override
                    public void handleResponse(T response) {
                        unregisterChildNode.close();
                        handler.handleResponse(response);
                    }

                    @Override
                    public void handleException(TransportException exp) {
                        unregisterChildNode.close();
                        handler.handleException(exp);
                    }

                    @Override
                    public String executor() {
                        return handler.executor();
                    }

                    @Override
                    public T read(StreamInput in) throws IOException {
                        return handler.read(in);
                    }

                    @Override
                    public String toString() {
                        return getClass().getName() + "/[" + action + "]:" + handler.toString();
                    }
                };
            } else {
                delegate = handler;
            }
            asyncSender.sendRequest(connection, action, request, options, delegate);
        } catch (final Exception ex) {
            // the caller might not handle this so we invoke the handler
            final TransportException te;
            if (ex instanceof TransportException) {
                te = (TransportException) ex;
            } else {
                te = new TransportException("failure to send", ex);
            }
            handler.handleException(te);
        }
    }
    // org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor#interceptSender
    @Override
    public AsyncSender interceptSender(AsyncSender sender) {
        return new AsyncSender() {
            @Override
            public <T extends TransportResponse> void sendRequest(Transport.Connection connection, String action, TransportRequest request,
                                                                  TransportRequestOptions options, TransportResponseHandler<T> handler) {
                final boolean requireAuth = shouldRequireExistingAuthentication();
                // the transport in core normally does this check, BUT since we are serializing to a string header we need to do it
                // ourselves otherwise we wind up using a version newer than what we can actually send
                final Version minVersion = Version.min(connection.getVersion(), Version.CURRENT);

                // Sometimes a system action gets executed like a internal create index request or update mappings request
                // which means that the user is copied over to system actions so we need to change the user
                if (AuthorizationUtils.shouldReplaceUserWithSystem(threadPool.getThreadContext(), action)) {
                    securityContext.executeAsUser(SystemUser.INSTANCE, (original) -> sendWithUser(connection, action, request, options,
                            new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
                                    , handler), sender, requireAuth), minVersion);
                } else if (AuthorizationUtils.shouldSetUserBasedOnActionOrigin(threadPool.getThreadContext())) {
                    AuthorizationUtils.switchUserBasedOnActionOriginAndExecute(threadPool.getThreadContext(), securityContext,
                            (original) -> sendWithUser(connection, action, request, options,
                                    new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original)
                                            , handler), sender, requireAuth));
                } else if (securityContext.getAuthentication() != null &&
                        securityContext.getAuthentication().getVersion().equals(minVersion) == false) {
                    // re-write the authentication since we want the authentication version to match the version of the connection
                    securityContext.executeAfterRewritingAuthentication(original -> sendWithUser(connection, action, request, options,
                        new ContextRestoreResponseHandler<>(threadPool.getThreadContext().wrapRestorable(original), handler), sender,
                        requireAuth), minVersion);
                } else {
                    sendWithUser(connection, action, request, options, handler, sender, requireAuth);
                }
            }
        };
    }
    // org.elasticsearch.transport.TransportService#sendRequestInternal
    private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
                                                                   final TransportRequest request,
                                                                   final TransportRequestOptions options,
                                                                   TransportResponseHandler<T> handler) {
        if (connection == null) {
            throw new IllegalStateException("can't send request to a null connection");
        }
        DiscoveryNode node = connection.getNode();

        Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
        ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
        // TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
        final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
        final TimeoutHandler timeoutHandler;
        if (options.timeout() != null) {
            timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
            responseHandler.setTimeoutHandler(timeoutHandler);
        } else {
            timeoutHandler = null;
        }
        try {
            if (lifecycle.stoppedOrClosed()) {
                /*
                 * If we are not started the exception handling will remove the request holder again and calls the handler to notify the
                 * caller. It will only notify if toStop hasn't done the work yet.
                 */
                throw new NodeClosedException(localNode);
            }
            if (timeoutHandler != null) {
                assert options.timeout() != null;
                timeoutHandler.scheduleTimeout(options.timeout());
            }
            connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
        } catch (final Exception e) {
            // usually happen either because we failed to connect to the node
            // or because we failed serializing the message
            final Transport.ResponseContext<? extends TransportResponse> contextToNotify = responseHandlers.remove(requestId);
            // If holderToNotify == null then handler has already been taken care of.
            if (contextToNotify != null) {
                if (timeoutHandler != null) {
                    timeoutHandler.cancel();
                }
                // callback that an exception happened, but on a different thread since we don't
                // want handlers to worry about stack overflows. In the special case of running into a closing node we run on the current
                // thread on a best effort basis though.
                final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
                final String executor = lifecycle.stoppedOrClosed() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
                threadPool.executor(executor).execute(new AbstractRunnable() {
                    @Override
                    public void onRejection(Exception e) {
                        // if we get rejected during node shutdown we don't wanna bubble it up
                        logger.debug(
                            () -> new ParameterizedMessage(
                                "failed to notify response handler on rejection, action: {}",
                                contextToNotify.action()),
                            e);
                    }
                    @Override
                    public void onFailure(Exception e) {
                        logger.warn(
                            () -> new ParameterizedMessage(
                                "failed to notify response handler on exception, action: {}",
                                contextToNotify.action()),
                            e);
                    }
                    @Override
                    protected void doRun() throws Exception {
                        contextToNotify.handler().handleException(sendRequestException);
                    }
                });
            } else {
                logger.debug("Exception while sending request, handler likely already notified due to timeout", e);
            }
        }
    }
        // org.elasticsearch.transport.Transport.Connection#sendRequest
        @Override
        public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
            throws TransportException {
            sendLocalRequest(requestId, action, request, options);
        }
    // org.elasticsearch.transport.TransportService#sendLocalRequest
    private void sendLocalRequest(long requestId, final String action, final TransportRequest request, TransportRequestOptions options) {
        final DirectResponseChannel channel = new DirectResponseChannel(localNode, action, requestId, this, threadPool);
        try {
            onRequestSent(localNode, requestId, action, request, options);
            onRequestReceived(requestId, action);
            final RequestHandlerRegistry reg = getRequestHandler(action);
            if (reg == null) {
                throw new ActionNotFoundTransportException("Action [" + action + "] not found");
            }
            final String executor = reg.getExecutor();
            if (ThreadPool.Names.SAME.equals(executor)) {
                //noinspection unchecked
                // ...
                reg.processMessageReceived(request, channel);
            } else {
                threadPool.executor(executor).execute(new AbstractRunnable() {
                    @Override
                    protected void doRun() throws Exception {
                        //noinspection unchecked
                        reg.processMessageReceived(request, channel);
                    }

                    @Override
                    public boolean isForceExecution() {
                        return reg.isForceExecution();
                    }

                    @Override
                    public void onFailure(Exception e) {
                        try {
                            channel.sendResponse(e);
                        } catch (Exception inner) {
                            inner.addSuppressed(e);
                            logger.warn(() -> new ParameterizedMessage(
                                    "failed to notify channel of error message for action [{}]", action), inner);
                        }
                    }

                    @Override
                    public String toString() {
                        return "processing of [" + requestId + "][" + action + "]: " + request;
                    }
                });
            }

        } catch (Exception e) {
            try {
                channel.sendResponse(e);
            } catch (Exception inner) {
                inner.addSuppressed(e);
                logger.warn(
                    () -> new ParameterizedMessage(
                        "failed to notify channel of error message for action [{}]", action), inner);
            }
        }
    }

3. 单节点的搜索实现

前面许多动作,都是在prepare. 但最终,始终要落到lucene上搜索才行。它将由 SearchService 执行搜索动作。

总体时时序图如下:

具体代码实现:

// org.elasticsearch.search.SearchService#executeQueryPhase
    public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInContext,
                                  SearchShardTask task, ActionListener<SearchPhaseResult> listener) {
        assert request.canReturnNullResponseIfMatchNoDocs() == false || request.numberOfShards() > 1
            : "empty responses require more than one shard";
        final IndexShard shard = getShard(request);
        rewriteAndFetchShardRequest(shard, request, new ActionListener<ShardSearchRequest>() {
            @Override
            public void onResponse(ShardSearchRequest orig) {
                // check if we can shortcut the query phase entirely.
                if (orig.canReturnNullResponseIfMatchNoDocs()) {
                    assert orig.scroll() == null;
                    final CanMatchResponse canMatchResp;
                    try {
                        ShardSearchRequest clone = new ShardSearchRequest(orig);
                        canMatchResp = canMatch(clone, false);
                    } catch (Exception exc) {
                        listener.onFailure(exc);
                        return;
                    }
                    if (canMatchResp.canMatch == false) {
                        listener.onResponse(QuerySearchResult.nullInstance());
                        return;
                    }
                }
                // fork the execution in the search thread pool
                // 异步运行  executor.execute()
                runAsync(getExecutor(shard), () -> executeQueryPhase(orig, task, keepStatesInContext), listener);
            }

            @Override
            public void onFailure(Exception exc) {
                listener.onFailure(exc);
            }
        });
    }
    // org.elasticsearch.search.SearchService#executeQueryPhase
    private SearchPhaseResult executeQueryPhase(ShardSearchRequest request,
                                                SearchShardTask task,
                                                boolean keepStatesInContext) throws Exception {
        final ReaderContext readerContext = createOrGetReaderContext(request, keepStatesInContext);
        try (Releasable ignored = readerContext.markAsUsed(getKeepAlive(request));
                // 创建上下文,参数信息,此处将重新解析请求参数
                SearchContext context = createContext(readerContext, request, task, true)) {
            final long afterQueryTime;
            try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
                // 执行 search, 得到 DocId 信息,放入context中
                loadOrExecuteQueryPhase(request, context);
                if (context.queryResult().hasSearchContext() == false && readerContext.singleSession()) {
                    freeReaderContext(readerContext.id());
                }
                afterQueryTime = executor.success();
            }
            if (request.numberOfShards() == 1) {
                // 结果集中只存在一个shard, 则可立即查询文档详情
                return executeFetchPhase(readerContext, context, afterQueryTime);
            } else {
                // 否则,需要再做排序操作后再查询文档详情
                // Pass the rescoreDocIds to the queryResult to send them the coordinating node and receive them back in the fetch phase.
                // We also pass the rescoreDocIds to the LegacyReaderContext in case the search state needs to stay in the data node.
                final RescoreDocIds rescoreDocIds = context.rescoreDocIds();
                context.queryResult().setRescoreDocIds(rescoreDocIds);
                readerContext.setRescoreDocIds(rescoreDocIds);
                return context.queryResult();
            }
        } catch (Exception e) {
            // execution exception can happen while loading the cache, strip it
            if (e instanceof ExecutionException) {
                e = (e.getCause() == null || e.getCause() instanceof Exception) ?
                    (Exception) e.getCause() : new ElasticsearchException(e.getCause());
            }
            logger.trace("Query phase failed", e);
            processFailure(readerContext, e);
            throw e;
        }
    }

    // org.elasticsearch.search.SearchService#loadOrExecuteQueryPhase
    /**
     * Try to load the query results from the cache or execute the query phase directly if the cache cannot be used.
     */
    private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
        final boolean canCache = indicesService.canCache(request, context);
        context.getQueryShardContext().freezeContext();
        if (canCache) {
            indicesService.loadIntoContext(request, context, queryPhase);
        } else {
            queryPhase.execute(context);
        }
    }

以上即是单节点搜索框架,查询DocId, 然后视情况查询doc详情。说来倒也简单,只是许多细节,需要知晓。

首先,创建上下文的过程,主要是解析现有参数,备后续使用,以及后续结果也将存入上下文中。按需展开。

// org.elasticsearch.search.SearchService#createContext
    final SearchContext createContext(ReaderContext readerContext,
                                      ShardSearchRequest request,
                                      SearchShardTask task,
                                      boolean includeAggregations) throws IOException {
        final DefaultSearchContext context = createSearchContext(readerContext, request, defaultSearchTimeout);
        try {
            if (request.scroll() != null) {
                context.scrollContext().scroll = request.scroll();
            }
            // 解析参数
            parseSource(context, request.source(), includeAggregations);

            // if the from and size are still not set, default them
            // DEFAULT_FROM=0
            if (context.from() == -1) {
                context.from(DEFAULT_FROM);
            }
            // DEFAULT_SIZE=10
            if (context.size() == -1) {
                context.size(DEFAULT_SIZE);
            }
            context.setTask(task);

            // pre process
            queryPhase.preProcess(context);
        } catch (Exception e) {
            context.close();
            throw e;
        }

        return context;
    }
    // org.elasticsearch.search.SearchService#parseSource
    private void parseSource(DefaultSearchContext context, SearchSourceBuilder source, boolean includeAggregations) {
        // nothing to parse...
        if (source == null) {
            return;
        }
        SearchShardTarget shardTarget = context.shardTarget();
        QueryShardContext queryShardContext = context.getQueryShardContext();
        context.from(source.from());
        context.size(source.size());
        Map<String, InnerHitContextBuilder> innerHitBuilders = new HashMap<>();
        // 解析query
        if (source.query() != null) {
            InnerHitContextBuilder.extractInnerHits(source.query(), innerHitBuilders);
            context.parsedQuery(queryShardContext.toQuery(source.query()));
        }
        // 解析 post_filter
        if (source.postFilter() != null) {
            InnerHitContextBuilder.extractInnerHits(source.postFilter(), innerHitBuilders);
            context.parsedPostFilter(queryShardContext.toQuery(source.postFilter()));
        }
        if (innerHitBuilders.size() > 0) {
            for (Map.Entry<String, InnerHitContextBuilder> entry : innerHitBuilders.entrySet()) {
                try {
                    entry.getValue().build(context, context.innerHits());
                } catch (IOException e) {
                    throw new SearchException(shardTarget, "failed to build inner_hits", e);
                }
            }
        }
        if (source.sorts() != null) {
            try {
                Optional<SortAndFormats> optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext());
                if (optionalSort.isPresent()) {
                    context.sort(optionalSort.get());
                }
            } catch (IOException e) {
                throw new SearchException(shardTarget, "failed to create sort elements", e);
            }
        }
        context.trackScores(source.trackScores());
        if (source.trackTotalHitsUpTo() != null
                && source.trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_ACCURATE
                && context.scrollContext() != null) {
            throw new SearchException(shardTarget, "disabling [track_total_hits] is not allowed in a scroll context");
        }
        if (source.trackTotalHitsUpTo() != null) {
            context.trackTotalHitsUpTo(source.trackTotalHitsUpTo());
        }
        if (source.minScore() != null) {
            context.minimumScore(source.minScore());
        }
        if (source.profile()) {
            context.setProfilers(new Profilers(context.searcher()));
        }
        if (source.timeout() != null) {
            context.timeout(source.timeout());
        }
        context.terminateAfter(source.terminateAfter());
        // 聚合解析
        if (source.aggregations() != null && includeAggregations) {
            AggregationContext aggContext = new ProductionAggregationContext(
                context.getQueryShardContext(),
                /*
                 * The query on the search context right now doesn't include
                 * the filter for nested documents or slicing so we have to
                 * delay reading it until the aggs ask for it.
                 */
                () -> context.query() == null ? new MatchAllDocsQuery() : context.query(),
                context.getProfilers() == null ? null : context.getProfilers().getAggregationProfiler(),
                multiBucketConsumerService.create(),
                () -> new SubSearchContext(context).parsedQuery(context.parsedQuery()).fetchFieldsContext(context.fetchFieldsContext()),
                context::addReleasable,
                context.bitsetFilterCache(),
                context.indexShard().shardId().hashCode(),
                context::getRelativeTimeInMillis,
                context::isCancelled
            );
            try {
                AggregatorFactories factories = source.aggregations().build(aggContext, null);
                context.aggregations(new SearchContextAggregations(factories));
            } catch (IOException e) {
                throw new AggregationInitializationException("Failed to create aggregators", e);
            }
        }
        if (source.suggest() != null) {
            try {
                context.suggest(source.suggest().build(queryShardContext));
            } catch (IOException e) {
                throw new SearchException(shardTarget, "failed to create SuggestionSearchContext", e);
            }
        }
        if (source.rescores() != null) {
            try {
                for (RescorerBuilder<?> rescore : source.rescores()) {
                    context.addRescore(rescore.buildContext(queryShardContext));
                }
            } catch (IOException e) {
                throw new SearchException(shardTarget, "failed to create RescoreSearchContext", e);
            }
        }
        if (source.explain() != null) {
            context.explain(source.explain());
        }
        if (source.fetchSource() != null) {
            context.fetchSourceContext(source.fetchSource());
        }
        if (source.docValueFields() != null) {
            FetchDocValuesContext docValuesContext = new FetchDocValuesContext(context.getQueryShardContext(), source.docValueFields());
            context.docValuesContext(docValuesContext);
        }
        if (source.fetchFields() != null) {
            FetchFieldsContext fetchFieldsContext = new FetchFieldsContext(source.fetchFields());
            context.fetchFieldsContext(fetchFieldsContext);
        }
        if (source.highlighter() != null) {
            HighlightBuilder highlightBuilder = source.highlighter();
            try {
                context.highlight(highlightBuilder.build(queryShardContext));
            } catch (IOException e) {
                throw new SearchException(shardTarget, "failed to create SearchContextHighlighter", e);
            }
        }
        if (source.scriptFields() != null && source.size() != 0) {
            int maxAllowedScriptFields = queryShardContext.getIndexSettings().getMaxScriptFields();
            if (source.scriptFields().size() > maxAllowedScriptFields) {
                throw new IllegalArgumentException(
                        "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxAllowedScriptFields
                                + "] but was [" + source.scriptFields().size() + "]. This limit can be set by changing the ["
                                + IndexSettings.MAX_SCRIPT_FIELDS_SETTING.getKey() + "] index level setting.");
            }
            for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) {
                FieldScript.Factory factory = scriptService.compile(field.script(), FieldScript.CONTEXT);
                SearchLookup lookup = context.getQueryShardContext().lookup();
                FieldScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), lookup);
                context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, field.ignoreFailure()));
            }
        }
        if (source.ext() != null) {
            for (SearchExtBuilder searchExtBuilder : source.ext()) {
                context.addSearchExt(searchExtBuilder);
            }
        }
        if (source.version() != null) {
            context.version(source.version());
        }

        if (source.seqNoAndPrimaryTerm() != null) {
            context.seqNoAndPrimaryTerm(source.seqNoAndPrimaryTerm());
        }

        if (source.stats() != null) {
            context.groupStats(source.stats());
        }
        if (CollectionUtils.isEmpty(source.searchAfter()) == false) {
            if (context.scrollContext() != null) {
                throw new SearchException(shardTarget, "`search_after` cannot be used in a scroll context.");
            }
            if (context.from() > 0) {
                throw new SearchException(shardTarget, "`from` parameter must be set to 0 when `search_after` is used.");
            }
            FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter());
            context.searchAfter(fieldDoc);
        }

        if (source.slice() != null) {
            if (context.scrollContext() == null) {
                throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context");
            }
            context.sliceBuilder(source.slice());
        }

        if (source.storedFields() != null) {
            if (source.storedFields().fetchFields() == false) {
                if (context.sourceRequested()) {
                    throw new SearchException(shardTarget, "[stored_fields] cannot be disabled if [_source] is requested");
                }
                if (context.fetchFieldsContext() != null) {
                    throw new SearchException(shardTarget, "[stored_fields] cannot be disabled when using the [fields] option");
                }
            }
            context.storedFieldsContext(source.storedFields());
        }

        if (source.collapse() != null) {
            if (context.scrollContext() != null) {
                throw new SearchException(shardTarget, "cannot use `collapse` in a scroll context");
            }
            if (context.searchAfter() != null) {
                throw new SearchException(shardTarget, "cannot use `collapse` in conjunction with `search_after`");
            }
            if (context.rescore() != null && context.rescore().isEmpty() == false) {
                throw new SearchException(shardTarget, "cannot use `collapse` in conjunction with `rescore`");
            }
            final CollapseContext collapseContext = source.collapse().build(queryShardContext);
            context.collapse(collapseContext);
        }
    }

以上即是单节点搜索框架,查询DocId, 然后视情况查询doc详情。说来倒也简单,只是许多细节,需要知晓。

首先,创建上下文的过程,主要是解析现有参数,备后续使用,以及后续结果也将存入上下文中。按需展开。

// org.elasticsearch.search.SearchService#createContext
    final SearchContext createContext(ReaderContext readerContext,
                                      ShardSearchRequest request,
                                      SearchShardTask task,
                                      boolean includeAggregations) throws IOException {
        final DefaultSearchContext context = createSearchContext(readerContext, request, defaultSearchTimeout);
        try {
            if (request.scroll() != null) {
                context.scrollContext().scroll = request.scroll();
            }
            // 解析参数
            parseSource(context, request.source(), includeAggregations);

            // if the from and size are still not set, default them
            // DEFAULT_FROM=0
            if (context.from() == -1) {
                context.from(DEFAULT_FROM);
            }
            // DEFAULT_SIZE=10
            if (context.size() == -1) {
                context.size(DEFAULT_SIZE);
            }
            context.setTask(task);

            // pre process
            queryPhase.preProcess(context);
        } catch (Exception e) {
            context.close();
            throw e;
        }

        return context;
    }
    // org.elasticsearch.search.SearchService#parseSource
    private void parseSource(DefaultSearchContext context, SearchSourceBuilder source, boolean includeAggregations) {
        // nothing to parse...
        if (source == null) {
            return;
        }
        SearchShardTarget shardTarget = context.shardTarget();
        QueryShardContext queryShardContext = context.getQueryShardContext();
        context.from(source.from());
        context.size(source.size());
        Map<String, InnerHitContextBuilder> innerHitBuilders = new HashMap<>();
        // 解析query
        if (source.query() != null) {
            InnerHitContextBuilder.extractInnerHits(source.query(), innerHitBuilders);
            context.parsedQuery(queryShardContext.toQuery(source.query()));
        }
        // 解析 post_filter
        if (source.postFilter() != null) {
            InnerHitContextBuilder.extractInnerHits(source.postFilter(), innerHitBuilders);
            context.parsedPostFilter(queryShardContext.toQuery(source.postFilter()));
        }
        if (innerHitBuilders.size() > 0) {
            for (Map.Entry<String, InnerHitContextBuilder> entry : innerHitBuilders.entrySet()) {
                try {
                    entry.getValue().build(context, context.innerHits());
                } catch (IOException e) {
                    throw new SearchException(shardTarget, "failed to build inner_hits", e);
                }
            }
        }
        if (source.sorts() != null) {
            try {
                Optional<SortAndFormats> optionalSort = SortBuilder.buildSort(source.sorts(), context.getQueryShardContext());
                if (optionalSort.isPresent()) {
                    context.sort(optionalSort.get());
                }
            } catch (IOException e) {
                throw new SearchException(shardTarget, "failed to create sort elements", e);
            }
        }
        context.trackScores(source.trackScores());
        if (source.trackTotalHitsUpTo() != null
                && source.trackTotalHitsUpTo() != SearchContext.TRACK_TOTAL_HITS_ACCURATE
                && context.scrollContext() != null) {
            throw new SearchException(shardTarget, "disabling [track_total_hits] is not allowed in a scroll context");
        }
        if (source.trackTotalHitsUpTo() != null) {
            context.trackTotalHitsUpTo(source.trackTotalHitsUpTo());
        }
        if (source.minScore() != null) {
            context.minimumScore(source.minScore());
        }
        if (source.profile()) {
            context.setProfilers(new Profilers(context.searcher()));
        }
        if (source.timeout() != null) {
            context.timeout(source.timeout());
        }
        context.terminateAfter(source.terminateAfter());
        // 聚合解析
        if (source.aggregations() != null && includeAggregations) {
            AggregationContext aggContext = new ProductionAggregationContext(
                context.getQueryShardContext(),
                /*
                 * The query on the search context right now doesn't include
                 * the filter for nested documents or slicing so we have to
                 * delay reading it until the aggs ask for it.
                 */
                () -> context.query() == null ? new MatchAllDocsQuery() : context.query(),
                context.getProfilers() == null ? null : context.getProfilers().getAggregationProfiler(),
                multiBucketConsumerService.create(),
                () -> new SubSearchContext(context).parsedQuery(context.parsedQuery()).fetchFieldsContext(context.fetchFieldsContext()),
                context::addReleasable,
                context.bitsetFilterCache(),
                context.indexShard().shardId().hashCode(),
                context::getRelativeTimeInMillis,
                context::isCancelled
            );
            try {
                AggregatorFactories factories = source.aggregations().build(aggContext, null);
                context.aggregations(new SearchContextAggregations(factories));
            } catch (IOException e) {
                throw new AggregationInitializationException("Failed to create aggregators", e);
            }
        }
        if (source.suggest() != null) {
            try {
                context.suggest(source.suggest().build(queryShardContext));
            } catch (IOException e) {
                throw new SearchException(shardTarget, "failed to create SuggestionSearchContext", e);
            }
        }
        if (source.rescores() != null) {
            try {
                for (RescorerBuilder<?> rescore : source.rescores()) {
                    context.addRescore(rescore.buildContext(queryShardContext));
                }
            } catch (IOException e) {
                throw new SearchException(shardTarget, "failed to create RescoreSearchContext", e);
            }
        }
        if (source.explain() != null) {
            context.explain(source.explain());
        }
        if (source.fetchSource() != null) {
            context.fetchSourceContext(source.fetchSource());
        }
        if (source.docValueFields() != null) {
            FetchDocValuesContext docValuesContext = new FetchDocValuesContext(context.getQueryShardContext(), source.docValueFields());
            context.docValuesContext(docValuesContext);
        }
        if (source.fetchFields() != null) {
            FetchFieldsContext fetchFieldsContext = new FetchFieldsContext(source.fetchFields());
            context.fetchFieldsContext(fetchFieldsContext);
        }
        if (source.highlighter() != null) {
            HighlightBuilder highlightBuilder = source.highlighter();
            try {
                context.highlight(highlightBuilder.build(queryShardContext));
            } catch (IOException e) {
                throw new SearchException(shardTarget, "failed to create SearchContextHighlighter", e);
            }
        }
        if (source.scriptFields() != null && source.size() != 0) {
            int maxAllowedScriptFields = queryShardContext.getIndexSettings().getMaxScriptFields();
            if (source.scriptFields().size() > maxAllowedScriptFields) {
                throw new IllegalArgumentException(
                        "Trying to retrieve too many script_fields. Must be less than or equal to: [" + maxAllowedScriptFields
                                + "] but was [" + source.scriptFields().size() + "]. This limit can be set by changing the ["
                                + IndexSettings.MAX_SCRIPT_FIELDS_SETTING.getKey() + "] index level setting.");
            }
            for (org.elasticsearch.search.builder.SearchSourceBuilder.ScriptField field : source.scriptFields()) {
                FieldScript.Factory factory = scriptService.compile(field.script(), FieldScript.CONTEXT);
                SearchLookup lookup = context.getQueryShardContext().lookup();
                FieldScript.LeafFactory searchScript = factory.newFactory(field.script().getParams(), lookup);
                context.scriptFields().add(new ScriptField(field.fieldName(), searchScript, field.ignoreFailure()));
            }
        }
        if (source.ext() != null) {
            for (SearchExtBuilder searchExtBuilder : source.ext()) {
                context.addSearchExt(searchExtBuilder);
            }
        }
        if (source.version() != null) {
            context.version(source.version());
        }

        if (source.seqNoAndPrimaryTerm() != null) {
            context.seqNoAndPrimaryTerm(source.seqNoAndPrimaryTerm());
        }

        if (source.stats() != null) {
            context.groupStats(source.stats());
        }
        if (CollectionUtils.isEmpty(source.searchAfter()) == false) {
            if (context.scrollContext() != null) {
                throw new SearchException(shardTarget, "`search_after` cannot be used in a scroll context.");
            }
            if (context.from() > 0) {
                throw new SearchException(shardTarget, "`from` parameter must be set to 0 when `search_after` is used.");
            }
            FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter());
            context.searchAfter(fieldDoc);
        }

        if (source.slice() != null) {
            if (context.scrollContext() == null) {
                throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context");
            }
            context.sliceBuilder(source.slice());
        }

        if (source.storedFields() != null) {
            if (source.storedFields().fetchFields() == false) {
                if (context.sourceRequested()) {
                    throw new SearchException(shardTarget, "[stored_fields] cannot be disabled if [_source] is requested");
                }
                if (context.fetchFieldsContext() != null) {
                    throw new SearchException(shardTarget, "[stored_fields] cannot be disabled when using the [fields] option");
                }
            }
            context.storedFieldsContext(source.storedFields());
        }

        if (source.collapse() != null) {
            if (context.scrollContext() != null) {
                throw new SearchException(shardTarget, "cannot use `collapse` in a scroll context");
            }
            if (context.searchAfter() != null) {
                throw new SearchException(shardTarget, "cannot use `collapse` in conjunction with `search_after`");
            }
            if (context.rescore() != null && context.rescore().isEmpty() == false) {
                throw new SearchException(shardTarget, "cannot use `collapse` in conjunction with `rescore`");
            }
            final CollapseContext collapseContext = source.collapse().build(queryShardContext);
            context.collapse(collapseContext);
        }
    }

第二、具体查找docId是由 QueryPhase 实现:

// org.elasticsearch.search.query.QueryPhase#execute
    public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
        if (searchContext.hasOnlySuggest()) {
            suggestPhase.execute(searchContext);
            searchContext.queryResult().topDocs(new TopDocsAndMaxScore(
                    new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),
                new DocValueFormat[0]);
            return;
        }

        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext));
        }

        // Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F
        // request, preProcess is called on the DFS phase phase, this is why we pre-process them
        // here to make sure it happens during the QUERY phase
           // 几个前置后置处理点,使整体功能齐全
        aggregationPhase.preProcess(searchContext);
        // 执行真正的查询
        boolean rescore = executeInternal(searchContext);

        if (rescore) { // only if we do a regular search
            rescorePhase.execute(searchContext);
        }
        suggestPhase.execute(searchContext);
        aggregationPhase.execute(searchContext);

        if (searchContext.getProfilers() != null) {
            ProfileShardResult shardResults = SearchProfileShardResults
                .buildShardResults(searchContext.getProfilers());
            searchContext.queryResult().profileResults(shardResults);
        }
    }
    // org.elasticsearch.search.query.QueryPhase#executeInternal
    /**
     * In a package-private method so that it can be tested without having to
     * wire everything (mapperService, etc.)
     * @return whether the rescoring phase should be executed
     */
    static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExecutionException {
        final ContextIndexSearcher searcher = searchContext.searcher();
        SortAndFormats sortAndFormatsForRewrittenNumericSort = null;
        final IndexReader reader = searcher.getIndexReader();
        QuerySearchResult queryResult = searchContext.queryResult();
        queryResult.searchTimedOut(false);
        try {
            queryResult.from(searchContext.from());
            queryResult.size(searchContext.size());
            Query query = searchContext.query();
            assert query == searcher.rewrite(query); // already rewritten

            final ScrollContext scrollContext = searchContext.scrollContext();
            if (scrollContext != null) {
                if (scrollContext.totalHits == null) {
                    // first round
                    assert scrollContext.lastEmittedDoc == null;
                    // there is not much that we can optimize here since we want to collect all
                    // documents in order to get the total number of hits

                } else {
                    final ScoreDoc after = scrollContext.lastEmittedDoc;
                    if (returnsDocsInOrder(query, searchContext.sort())) {
                        // now this gets interesting: since we sort in index-order, we can directly
                        // skip to the desired doc
                        if (after != null) {
                            query = new BooleanQuery.Builder()
                                .add(query, BooleanClause.Occur.MUST)
                                .add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER)
                                .build();
                        }
                        // ... and stop collecting after ${size} matches
                        searchContext.terminateAfter(searchContext.size());
                    } else if (canEarlyTerminate(reader, searchContext.sort())) {
                        // now this gets interesting: since the search sort is a prefix of the index sort, we can directly
                        // skip to the desired doc
                        if (after != null) {
                            query = new BooleanQuery.Builder()
                                .add(query, BooleanClause.Occur.MUST)
                                .add(new SearchAfterSortedDocQuery(searchContext.sort().sort, (FieldDoc) after), BooleanClause.Occur.FILTER)
                                .build();
                        }
                    }
                }
            }

            final LinkedList<QueryCollectorContext> collectors = new LinkedList<>();
            // whether the chain contains a collector that filters documents
            boolean hasFilterCollector = false;
            if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) {
                // add terminate_after before the filter collectors
                // it will only be applied on documents accepted by these filter collectors
                collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter()));
                // this collector can filter documents during the collection
                hasFilterCollector = true;
            }
            if (searchContext.parsedPostFilter() != null) {
                // add post filters before aggregations
                // it will only be applied to top hits
                collectors.add(createFilteredCollectorContext(searcher, searchContext.parsedPostFilter().query()));
                // this collector can filter documents during the collection
                hasFilterCollector = true;
            }
            if (searchContext.queryCollectors().isEmpty() == false) {
                // plug in additional collectors, like aggregations
                collectors.add(createMultiCollectorContext(searchContext.queryCollectors().values()));
            }
            if (searchContext.minimumScore() != null) {
                // apply the minimum score after multi collector so we filter aggs as well
                collectors.add(createMinScoreCollectorContext(searchContext.minimumScore()));
                // this collector can filter documents during the collection
                hasFilterCollector = true;
            }

            CheckedConsumer<List<LeafReaderContext>, IOException> leafSorter = l -> {};
            // try to rewrite numeric or date sort to the optimized distanceFeatureQuery
            if ((searchContext.sort() != null) && SYS_PROP_REWRITE_SORT) {
                Query rewrittenQuery = tryRewriteLongSort(searchContext, searcher.getIndexReader(), query, hasFilterCollector);
                if (rewrittenQuery != null) {
                    query = rewrittenQuery;
                    // modify sorts: add sort on _score as 1st sort, and move the sort on the original field as the 2nd sort
                    SortField[] oldSortFields = searchContext.sort().sort.getSort();
                    DocValueFormat[] oldFormats = searchContext.sort().formats;
                    SortField[] newSortFields = new SortField[oldSortFields.length + 1];
                    DocValueFormat[] newFormats = new DocValueFormat[oldSortFields.length + 1];
                    newSortFields[0] = SortField.FIELD_SCORE;
                    newFormats[0] = DocValueFormat.RAW;
                    System.arraycopy(oldSortFields, 0, newSortFields, 1, oldSortFields.length);
                    System.arraycopy(oldFormats, 0, newFormats, 1, oldFormats.length);
                    sortAndFormatsForRewrittenNumericSort = searchContext.sort(); // stash SortAndFormats to restore it later
                    searchContext.sort(new SortAndFormats(new Sort(newSortFields), newFormats));
                    leafSorter = createLeafSorter(oldSortFields[0]);
                }
            }

            boolean timeoutSet = scrollContext == null && searchContext.timeout() != null &&
                searchContext.timeout().equals(SearchService.NO_TIMEOUT) == false;

            final Runnable timeoutRunnable;
            if (timeoutSet) {
                final long startTime = searchContext.getRelativeTimeInMillis();
                final long timeout = searchContext.timeout().millis();
                final long maxTime = startTime + timeout;
                timeoutRunnable = searcher.addQueryCancellation(() -> {
                    final long time = searchContext.getRelativeTimeInMillis();
                    if (time > maxTime) {
                        throw new TimeExceededException();
                    }
                });
            } else {
                timeoutRunnable = null;
            }

            if (searchContext.lowLevelCancellation()) {
                searcher.addQueryCancellation(() -> {
                    SearchShardTask task = searchContext.getTask();
                    if (task != null && task.isCancelled()) {
                        throw new TaskCancelledException("cancelled");
                    }
                });
            }

            try {
                boolean shouldRescore;
                // if we are optimizing sort and there are no other collectors
                if (sortAndFormatsForRewrittenNumericSort!=null && collectors.size()==0 && searchContext.getProfilers()==null) {
                    shouldRescore = searchWithCollectorManager(searchContext, searcher, query, leafSorter, timeoutSet);
                } else {
                    // search...
                    shouldRescore = searchWithCollector(searchContext, searcher, query, collectors, hasFilterCollector, timeoutSet);
                }

                // if we rewrote numeric long or date sort, restore fieldDocs based on the original sort
                if (sortAndFormatsForRewrittenNumericSort!=null) {
                    searchContext.sort(sortAndFormatsForRewrittenNumericSort); // restore SortAndFormats
                    restoreTopFieldDocs(queryResult, sortAndFormatsForRewrittenNumericSort);
                }

                ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH);
                if (executor instanceof QueueResizingEsThreadPoolExecutor) {
                    QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor;
                    queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
                    queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
                }
                return shouldRescore;
            } finally {
                // Search phase has finished, no longer need to check for timeout
                // otherwise aggregation phase might get cancelled.
                if (timeoutRunnable!=null) {
                    searcher.removeQueryCancellation(timeoutRunnable);
                }
            }
        } catch (Exception e) {
            throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e);
        }
    }
    // org.elasticsearch.search.query.QueryPhase#searchWithCollector
    private static boolean searchWithCollector(SearchContext searchContext, ContextIndexSearcher searcher, Query query,
            LinkedList<QueryCollectorContext> collectors, boolean hasFilterCollector, boolean timeoutSet) throws IOException {
        // create the top docs collector last when the other collectors are known
        // 创建docs上下文
        final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
        // add the top docs collector, the first collector context in the chain
        collectors.addFirst(topDocsFactory);

        final Collector queryCollector;
        if (searchContext.getProfilers() != null) {
            InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors);
            searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector);
            queryCollector = profileCollector;
        } else {
            queryCollector = QueryCollectorContext.createQueryCollector(collectors);
        }
        QuerySearchResult queryResult = searchContext.queryResult();
        try {
            // 调用lucene接口,执行真正的查询
            searcher.search(query, queryCollector);
        } catch (EarlyTerminatingCollector.EarlyTerminationException e) {
            queryResult.terminatedEarly(true);
        } catch (TimeExceededException e) {
            assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
            if (searchContext.request().allowPartialSearchResults() == false) {
                // Can't rethrow TimeExceededException because not serializable
                throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
            }
            queryResult.searchTimedOut(true);
        }
        if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) {
            queryResult.terminatedEarly(false);
        }
        for (QueryCollectorContext ctx : collectors) {
            ctx.postProcess(queryResult);
        }
        return topDocsFactory.shouldRescore();
    }

更多。。。

// org.elasticsearch.search.query.TopDocsCollectorContext#createTopDocsCollectorContext
    /**
     * Creates a {@link TopDocsCollectorContext} from the provided <code>searchContext</code>.
     * @param hasFilterCollector True if the collector chain contains at least one collector that can filters document.
     */
    static TopDocsCollectorContext createTopDocsCollectorContext(SearchContext searchContext,
                                                                 boolean hasFilterCollector) throws IOException {
        final IndexReader reader = searchContext.searcher().getIndexReader();
        final Query query = searchContext.query();
        // top collectors don't like a size of 0
        final int totalNumDocs = Math.max(1, reader.numDocs());
        if (searchContext.size() == 0) {
            // no matter what the value of from is
            return new EmptyTopDocsCollectorContext(reader, query, searchContext.sort(),
                searchContext.trackTotalHitsUpTo(), hasFilterCollector);
        } else if (searchContext.scrollContext() != null) {
            // we can disable the tracking of total hits after the initial scroll query
            // since the total hits is preserved in the scroll context.
            int trackTotalHitsUpTo = searchContext.scrollContext().totalHits != null ?
                SearchContext.TRACK_TOTAL_HITS_DISABLED : SearchContext.TRACK_TOTAL_HITS_ACCURATE;
            // no matter what the value of from is
            int numDocs = Math.min(searchContext.size(), totalNumDocs);
            return new ScrollingTopDocsCollectorContext(reader, query, searchContext.scrollContext(),
                searchContext.sort(), numDocs, searchContext.trackScores(), searchContext.numberOfShards(),
                trackTotalHitsUpTo, hasFilterCollector);
        } else if (searchContext.collapse() != null) {
            boolean trackScores = searchContext.sort() == null ? true : searchContext.trackScores();
            int numDocs = Math.min(searchContext.from() + searchContext.size(), totalNumDocs);
            return new CollapsingTopDocsCollectorContext(searchContext.collapse(), searchContext.sort(), numDocs, trackScores);
        } else {
            int numDocs = Math.min(searchContext.from() + searchContext.size(), totalNumDocs);
            final boolean rescore = searchContext.rescore().isEmpty() == false;
            if (rescore) {
                assert searchContext.sort() == null;
                for (RescoreContext rescoreContext : searchContext.rescore()) {
                    numDocs = Math.max(numDocs, rescoreContext.getWindowSize());
                }
            }
            return new SimpleTopDocsCollectorContext(reader, query, searchContext.sort(), searchContext.searchAfter(), numDocs,
                searchContext.trackScores(), searchContext.trackTotalHitsUpTo(), hasFilterCollector) {
                @Override
                boolean shouldRescore() {
                    return rescore;
                }
            };
        }
    }
  // org.apache.lucene.search.IndexSearcher#search
  /** Lower-level search API.
   *
   * <p>{@link LeafCollector#collect(int)} is called for every matching document.
   *
   * @throws BooleanQuery.TooManyClauses If a query would exceed 
   *         {@link BooleanQuery#getMaxClauseCount()} clauses.
   */
  public void search(Query query, Collector results)
    throws IOException {
    query = rewrite(query);
    search(leafContexts, createWeight(query, results.scoreMode(), 1), results);
  }
      // org.elasticsearch.search.internal.ContextIndexSearcher#search
    @Override
    protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
        for (LeafReaderContext ctx : leaves) { // search each subreader
            searchLeaf(ctx, weight, collector);
        }
    }
    // org.elasticsearch.search.internal.ContextIndexSearcher#searchLeaf
    /**
     * Lower-level search API.
     *
     * {@link LeafCollector#collect(int)} is called for every matching document in
     * the provided <code>ctx</code>.
     */
    private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
        cancellable.checkCancelled();
        weight = wrapWeight(weight);
        final LeafCollector leafCollector;
        try {
            leafCollector = collector.getLeafCollector(ctx);
        } catch (CollectionTerminatedException e) {
            // there is no doc of interest in this reader context
            // continue with the following leaf
            return;
        }
        Bits liveDocs = ctx.reader().getLiveDocs();
        BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
        if (liveDocsBitSet == null) {
            BulkScorer bulkScorer = weight.bulkScorer(ctx);
            if (bulkScorer != null) {
                try {
                    bulkScorer.score(leafCollector, liveDocs);
                } catch (CollectionTerminatedException e) {
                    // collection was terminated prematurely
                    // continue with the following leaf
                }
            }
        } else {
            // if the role query result set is sparse then we should use the SparseFixedBitSet for advancing:
            Scorer scorer = weight.scorer(ctx);
            if (scorer != null) {
                try {
                    intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
                            this.cancellable.isEnabled() ? cancellable::checkCancelled: () -> {});
                } catch (CollectionTerminatedException e) {
                    // collection was terminated prematurely
                    // continue with the following leaf
                }
            }
        }
    }
  // org.apache.lucene.search.BulkScorer#score
  /** Scores and collects all matching documents.
   * @param collector The collector to which all matching documents are passed.
   * @param acceptDocs {@link Bits} that represents the allowed documents to match, or
   *                   {@code null} if they are all allowed to match.
   */
  public void score(LeafCollector collector, Bits acceptDocs) throws IOException {
    final int next = score(collector, acceptDocs, 0, DocIdSetIterator.NO_MORE_DOCS);
    assert next == DocIdSetIterator.NO_MORE_DOCS;
  }
          // org.elasticsearch.search.query.TopDocsCollectorContext.SimpleTopDocsCollectorContext#postProcess
        @Override
        void postProcess(QuerySearchResult result) throws IOException {
            final TopDocsAndMaxScore topDocs = newTopDocs();
            result.topDocs(topDocs, sortAndFormats == null ? null : sortAndFormats.formats);
        }
          // org.elasticsearch.search.query.TopDocsCollectorContext.SimpleTopDocsCollectorContext#newTopDocs
        TopDocsAndMaxScore newTopDocs() {
            TopDocs in = topDocsSupplier.get();
            float maxScore = maxScoreSupplier.get();
            final TopDocs newTopDocs;
            if (in instanceof TopFieldDocs) {
                TopFieldDocs fieldDocs = (TopFieldDocs) in;
                newTopDocs = new TopFieldDocs(totalHitsSupplier.get(), fieldDocs.scoreDocs, fieldDocs.fields);
            } else {
                newTopDocs = new TopDocs(totalHitsSupplier.get(), in.scoreDocs);
            }
            return new TopDocsAndMaxScore(newTopDocs, maxScore);
        }

最后,我们来看下查询到docId后,查询字段信息过程。(一般地,该过程会在所有节点的docId都查找完成之后,由协调节点处理后再进行该阶段操作。但此处,我们相当于走了 QUERY_AND_FETCH 流程,即立即查询结果。)

// org.elasticsearch.search.SearchService#executeFetchPhase
    private QueryFetchSearchResult executeFetchPhase(ReaderContext reader, SearchContext context, long afterQueryTime) {
        try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)){
            shortcutDocIdsToLoad(context);
            // 执行查询,将结果写入 context
            fetchPhase.execute(context);
            if (reader.singleSession()) {
                freeReaderContext(reader.id());
            }
            executor.success();
        }
        return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
    }
    // org.elasticsearch.search.SearchService#shortcutDocIdsToLoad
    /**
     * Shortcut ids to load, we load only "from" and up to "size". The phase controller
     * handles this as well since the result is always size * shards for Q_T_F
     */
    private void shortcutDocIdsToLoad(SearchContext context) {
        final int[] docIdsToLoad;
        int docsOffset = 0;
        final Suggest suggest = context.queryResult().suggest();
        int numSuggestDocs = 0;
        final List<CompletionSuggestion> completionSuggestions;
        if (suggest != null && suggest.hasScoreDocs()) {
            completionSuggestions = suggest.filter(CompletionSuggestion.class);
            for (CompletionSuggestion completionSuggestion : completionSuggestions) {
                numSuggestDocs += completionSuggestion.getOptions().size();
            }
        } else {
            completionSuggestions = Collections.emptyList();
        }
        if (context.request().scroll() != null) {
            TopDocs topDocs = context.queryResult().topDocs().topDocs;
            docIdsToLoad = new int[topDocs.scoreDocs.length + numSuggestDocs];
            for (int i = 0; i < topDocs.scoreDocs.length; i++) {
                docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
            }
        } else {
            TopDocs topDocs = context.queryResult().topDocs().topDocs;
            if (topDocs.scoreDocs.length < context.from()) {
                // no more docs...
                docIdsToLoad = new int[numSuggestDocs];
            } else {
                int totalSize = context.from() + context.size();
                docIdsToLoad = new int[Math.min(topDocs.scoreDocs.length - context.from(), context.size()) +
                    numSuggestDocs];
                for (int i = context.from(); i < Math.min(totalSize, topDocs.scoreDocs.length); i++) {
                    docIdsToLoad[docsOffset++] = topDocs.scoreDocs[i].doc;
                }
            }
        }
        for (CompletionSuggestion completionSuggestion : completionSuggestions) {
            for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
                docIdsToLoad[docsOffset++] = option.getDoc().doc;
            }
        }
        context.docIdsToLoad(docIdsToLoad, docIdsToLoad.length);
    }
    // org.elasticsearch.search.fetch.FetchPhase#execute
    public void execute(SearchContext context) {
        if (LOGGER.isTraceEnabled()) {
            LOGGER.trace("{}", new SearchContextSourcePrinter(context));
        }

        if (context.isCancelled()) {
            throw new TaskCancelledException("cancelled");
        }

        if (context.docIdsToLoadSize() == 0) {
            // no individual hits to process, so we shortcut
            context.fetchResult().hits(new SearchHits(new SearchHit[0], context.queryResult().getTotalHits(),
                context.queryResult().getMaxScore()));
            return;
        }

        DocIdToIndex[] docs = new DocIdToIndex[context.docIdsToLoadSize()];
        for (int index = 0; index < context.docIdsToLoadSize(); index++) {
            docs[index] = new DocIdToIndex(context.docIdsToLoad()[index], index);
        }
        // make sure that we iterate in doc id order
        Arrays.sort(docs);

        Map<String, Set<String>> storedToRequestedFields = new HashMap<>();
        FieldsVisitor fieldsVisitor = createStoredFieldsVisitor(context, storedToRequestedFields);

        FetchContext fetchContext = new FetchContext(context);

        SearchHit[] hits = new SearchHit[context.docIdsToLoadSize()];

        List<FetchSubPhaseProcessor> processors = getProcessors(context.shardTarget(), fetchContext);
        NestedDocuments nestedDocuments = context.getNestedDocuments();

        int currentReaderIndex = -1;
        LeafReaderContext currentReaderContext = null;
        LeafNestedDocuments leafNestedDocuments = null;
        CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader = null;
        boolean hasSequentialDocs = hasSequentialDocs(docs);
        for (int index = 0; index < context.docIdsToLoadSize(); index++) {
            if (context.isCancelled()) {
                throw new TaskCancelledException("cancelled");
            }
            int docId = docs[index].docId;
            try {
                int readerIndex = ReaderUtil.subIndex(docId, context.searcher().getIndexReader().leaves());
                if (currentReaderIndex != readerIndex) {
                    currentReaderContext = context.searcher().getIndexReader().leaves().get(readerIndex);
                    currentReaderIndex = readerIndex;
                    if (currentReaderContext.reader() instanceof SequentialStoredFieldsLeafReader
                            && hasSequentialDocs && docs.length >= 10) {
                        // All the docs to fetch are adjacent but Lucene stored fields are optimized
                        // for random access and don't optimize for sequential access - except for merging.
                        // So we do a little hack here and pretend we're going to do merges in order to
                        // get better sequential access.
                        SequentialStoredFieldsLeafReader lf = (SequentialStoredFieldsLeafReader) currentReaderContext.reader();
                        fieldReader = lf.getSequentialStoredFieldsReader()::visitDocument;
                    } else {
                        // reader.documet() 查询文档
                        fieldReader = currentReaderContext.reader()::document;
                    }
                    for (FetchSubPhaseProcessor processor : processors) {
                        processor.setNextReader(currentReaderContext);
                    }
                    leafNestedDocuments = nestedDocuments.getLeafNestedDocuments(currentReaderContext);
                }
                assert currentReaderContext != null;
                HitContext hit = prepareHitContext(
                    context,
                    leafNestedDocuments,
                    nestedDocuments::hasNonNestedParent,
                    fieldsVisitor,
                    docId,
                    storedToRequestedFields,
                    currentReaderContext,
                    fieldReader);
                for (FetchSubPhaseProcessor processor : processors) {
                    processor.process(hit);
                }
                hits[docs[index].index] = hit.hit();
            } catch (Exception e) {
                throw new FetchPhaseExecutionException(context.shardTarget(), "Error running fetch phase for doc [" + docId + "]", e);
            }
        }
        if (context.isCancelled()) {
            throw new TaskCancelledException("cancelled");
        }

        TotalHits totalHits = context.queryResult().getTotalHits();
        context.fetchResult().hits(new SearchHits(hits, totalHits, context.queryResult().getMaxScore()));

    }
    // org.elasticsearch.search.fetch.FetchPhase#prepareHitContext
    private HitContext prepareHitContext(SearchContext context,
                                         LeafNestedDocuments nestedDocuments,
                                         Predicate<String> hasNonNestedParent,
                                         FieldsVisitor fieldsVisitor,
                                         int docId,
                                         Map<String, Set<String>> storedToRequestedFields,
                                         LeafReaderContext subReaderContext,
                                         CheckedBiConsumer<Integer, FieldsVisitor, IOException> storedFieldReader) throws IOException {
        if (nestedDocuments.advance(docId - subReaderContext.docBase) == null) {
            return prepareNonNestedHitContext(
                context, fieldsVisitor, docId, storedToRequestedFields, subReaderContext, storedFieldReader);
        } else {
            return prepareNestedHitContext(context, docId, nestedDocuments, hasNonNestedParent, storedToRequestedFields,
                subReaderContext, storedFieldReader);
        }
    }
    // org.elasticsearch.search.fetch.FetchPhase#prepareNonNestedHitContext
    /**
     * Resets the provided {@link HitContext} with information on the current
     * document. This includes the following:
     *   - Adding an initial {@link SearchHit} instance.
     *   - Loading the document source and setting it on {@link HitContext#sourceLookup()}. This
     *     allows fetch subphases that use the hit context to access the preloaded source.
     */
    private HitContext prepareNonNestedHitContext(SearchContext context,
                                                  FieldsVisitor fieldsVisitor,
                                                  int docId,
                                                  Map<String, Set<String>> storedToRequestedFields,
                                                  LeafReaderContext subReaderContext,
                                                  CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader) throws IOException {
        int subDocId = docId - subReaderContext.docBase;
        QueryShardContext queryShardContext = context.getQueryShardContext();
        if (fieldsVisitor == null) {
            SearchHit hit = new SearchHit(docId, null, new Text(queryShardContext.getType()), null, null);
            return new HitContext(hit, subReaderContext, subDocId);
        } else {
            SearchHit hit;
            // 字段填充
            loadStoredFields(context.getQueryShardContext()::getFieldType, queryShardContext.getType(), fieldReader,
                fieldsVisitor, subDocId);
            Uid uid = fieldsVisitor.uid();
            if (fieldsVisitor.fields().isEmpty() == false) {
                Map<String, DocumentField> docFields = new HashMap<>();
                Map<String, DocumentField> metaFields = new HashMap<>();
                fillDocAndMetaFields(context, fieldsVisitor, storedToRequestedFields, docFields, metaFields);
                hit = new SearchHit(docId, uid.id(), new Text(queryShardContext.getType()), docFields, metaFields);
            } else {
                // hit 信息返回
                hit = new SearchHit(docId, uid.id(), new Text(queryShardContext.getType()), emptyMap(), emptyMap());
            }

            HitContext hitContext = new HitContext(hit, subReaderContext, subDocId);
            if (fieldsVisitor.source() != null) {
                // Store the loaded source on the hit context so that fetch subphases can access it.
                // Also make it available to scripts by storing it on the shared SearchLookup instance.
                hitContext.sourceLookup().setSource(fieldsVisitor.source());

                SourceLookup scriptSourceLookup = context.getQueryShardContext().lookup().source();
                scriptSourceLookup.setSegmentAndDocument(subReaderContext, subDocId);
                scriptSourceLookup.setSource(fieldsVisitor.source());
            }
            return hitContext;
        }
    }
    // org.elasticsearch.search.fetch.FetchPhase#loadStoredFields
    private void loadStoredFields(Function<String, MappedFieldType> fieldTypeLookup,
                                  @Nullable String type,
                                  CheckedBiConsumer<Integer, FieldsVisitor, IOException> fieldReader,
                                  FieldsVisitor fieldVisitor, int docId) throws IOException {
        fieldVisitor.reset();
        // org.apache.lucene.index.FilterLeafReader#document
        fieldReader.accept(docId, fieldVisitor);
        fieldVisitor.postProcess(fieldTypeLookup, type);
    }

经过如上过程,es已经搜索得到结果,最后就是将结果响应给客户端了。此过程虽不复杂,却也值得一看。

// org.elasticsearch.rest.action.RestStatusToXContentListener#buildResponse
    @Override
    public RestResponse buildResponse(Response response, XContentBuilder builder) throws Exception {
        assert response.isFragment() == false; //would be nice if we could make default methods final
        response.toXContent(builder, channel.request());
        RestResponse restResponse = new BytesRestResponse(response.status(), builder);
        if (RestStatus.CREATED == restResponse.status()) {
            final String location = extractLocation.apply(response);
            if (location != null) {
                restResponse.addHeader("Location", location);
            }
        }
        return restResponse;
    }
// org.elasticsearch.action.search.SearchResponse#toXContent
    @Override
    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
        builder.startObject();
        innerToXContent(builder, params);
        builder.endObject();
        return builder;
    }
    // org.elasticsearch.action.search.SearchResponse#innerToXContent
    public XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
        if (scrollId != null) {
            builder.field(SCROLL_ID.getPreferredName(), scrollId);
        }
        if (pointInTimeId != null) {
            builder.field(POINT_IN_TIME_ID.getPreferredName(), pointInTimeId);
        }
        builder.field(TOOK.getPreferredName(), tookInMillis);
        builder.field(TIMED_OUT.getPreferredName(), isTimedOut());
        if (isTerminatedEarly() != null) {
            builder.field(TERMINATED_EARLY.getPreferredName(), isTerminatedEarly());
        }
        if (getNumReducePhases() != 1) {
            builder.field(NUM_REDUCE_PHASES.getPreferredName(), getNumReducePhases());
        }
        RestActions.buildBroadcastShardsHeader(builder, params, getTotalShards(), getSuccessfulShards(), getSkippedShards(),
            getFailedShards(), getShardFailures());
        clusters.toXContent(builder, params);
        internalResponse.toXContent(builder, params);
        return builder;
    }

    // org.elasticsearch.action.search.SearchResponseSections#toXContent
    @Override
    public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
        hits.toXContent(builder, params);
        if (aggregations != null) {
            aggregations.toXContent(builder, params);
        }
        if (suggest != null) {
            suggest.toXContent(builder, params);
        }
        if (profileResults != null) {
            profileResults.toXContent(builder, params);
        }
        return builder;
    }

    @Override
    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
        builder.startObject(Fields.HITS);
        boolean totalHitAsInt = params.paramAsBoolean(RestSearchAction.TOTAL_HITS_AS_INT_PARAM, false);
        if (totalHitAsInt) {
            long total = totalHits == null ? -1 : totalHits.value;
            builder.field(Fields.TOTAL, total);
        } else if (totalHits != null) {
            builder.startObject(Fields.TOTAL);
            builder.field("value", totalHits.value);
            builder.field("relation", totalHits.relation == Relation.EQUAL_TO ? "eq" : "gte");
            builder.endObject();
        }
        if (Float.isNaN(maxScore)) {
            builder.nullField(Fields.MAX_SCORE);
        } else {
            builder.field(Fields.MAX_SCORE, maxScore);
        }
        builder.field(Fields.HITS);
        builder.startArray();
        for (SearchHit hit : hits) {
            hit.toXContent(builder, params);
        }
        builder.endArray();
        builder.endObject();
        return builder;
    }
    // org.elasticsearch.search.SearchHit#toXContent
    @Override
    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
        builder.startObject();
        toInnerXContent(builder, params);
        builder.endObject();
        return builder;
    }
    // org.elasticsearch.search.SearchHit#toInnerXContent
    // public because we render hit as part of completion suggestion option
    public XContentBuilder toInnerXContent(XContentBuilder builder, Params params) throws IOException {
        // For inner_hit hits shard is null and that is ok, because the parent search hit has all this information.
        // Even if this was included in the inner_hit hits this would be the same, so better leave it out.
        if (getExplanation() != null && shard != null) {
            builder.field(Fields._SHARD, shard.getShardId());
            builder.field(Fields._NODE, shard.getNodeIdText());
        }
        if (index != null) {
            builder.field(Fields._INDEX, RemoteClusterAware.buildRemoteIndexName(clusterAlias, index));
        }
        if (type != null) {
            builder.field(Fields._TYPE, type);
        }
        if (id != null) {
            builder.field(Fields._ID, id);
        }
        if (nestedIdentity != null) {
            nestedIdentity.toXContent(builder, params);
        }
        if (version != -1) {
            builder.field(Fields._VERSION, version);
        }

        if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
            builder.field(Fields._SEQ_NO, seqNo);
            builder.field(Fields._PRIMARY_TERM, primaryTerm);
        }

        if (Float.isNaN(score)) {
            builder.nullField(Fields._SCORE);
        } else {
            builder.field(Fields._SCORE, score);
        }

        for (DocumentField field : metaFields.values()) {
            // ignore empty metadata fields
            if (field.getValues().size() == 0) {
                continue;
            }
            // _ignored is the only multi-valued meta field
            // TODO: can we avoid having an exception here?
            if (field.getName().equals(IgnoredFieldMapper.NAME)) {
                builder.field(field.getName(), field.getValues());
            } else {
                builder.field(field.getName(), field.<Object>getValue());
            }
        }
        if (source != null) {
            XContentHelper.writeRawField(SourceFieldMapper.NAME, source, builder, params);
        }
        if (documentFields.isEmpty() == false &&
                // ignore fields all together if they are all empty
                documentFields.values().stream()
                    .anyMatch(df -> df.getValues().size() > 0)) {
            builder.startObject(Fields.FIELDS);
            for (DocumentField field : documentFields.values()) {
                if (field.getValues().size() > 0) {
                    field.toXContent(builder, params);
                }
            }
            builder.endObject();
        }
        if (highlightFields != null && !highlightFields.isEmpty()) {
            builder.startObject(Fields.HIGHLIGHT);
            for (HighlightField field : highlightFields.values()) {
                field.toXContent(builder, params);
            }
            builder.endObject();
        }
        sortValues.toXContent(builder, params);
        if (matchedQueries.length > 0) {
            builder.startArray(Fields.MATCHED_QUERIES);
            for (String matchedFilter : matchedQueries) {
                builder.value(matchedFilter);
            }
            builder.endArray();
        }
        if (getExplanation() != null) {
            builder.field(Fields._EXPLANATION);
            buildExplanation(builder, getExplanation());
        }
        if (innerHits != null) {
            builder.startObject(Fields.INNER_HITS);
            for (Map.Entry<String, SearchHits> entry : innerHits.entrySet()) {
                builder.startObject(entry.getKey());
                entry.getValue().toXContent(builder, params);
                builder.endObject();
            }
            builder.endObject();
        }
        return builder;
    }

本文讲了es search的简要框架过程,并就单个节点的搜索细节给了答案。原没有什么,只供各位看官了解了解罢了。若要知大概,只需两个时序图即可,无须废神。


本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-06-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JAVA烂猪皮 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2.1. 搜索请求的接入
  • 2.2. search请求的分发
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档