首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elasticsearch查询解析

Elasticsearch查询解析

原创
作者头像
用户1644123
修改2018-07-02 14:47:29
2.5K1
修改2018-07-02 14:47:29
举报

1. 背景

       Elasticsearch(ES)可用于全文检索、日志分析、指标分析、APM等众多场景,而且搭建部署容易,后期弹性扩容、故障处理简单。ES在一定程度上实现了一套系统支持多个场景的希望,大幅度降低使用多套专用系统的运维成本(当然ES不是万能的,不能满足事务等场景)。正是因为其通用性和易用性,ES自2010年发布首个版本以来得到爆发式的发展,广泛应用于各类互联网公司的不同业务场景。

       ES的查询接口具有分布式的数据检索、聚合分析能力,数据检索能力用于支持全文检索、日志分析等场景,如Github平台上的代码搜索、基于ES的各类日志分析服务等;聚合分析能力用于支持指标分析、APM等场景,如监控场景、应用的日活/留存分析等。本文基于ES 5.6.4,主要分析ES的分布式执行框架及查询主体流程,探究ES如何实现分布式查询、数据检索、聚合分析等能力。

2. 分布式查询框架及类型

       ES使用开源的Lucene作为存储引擎,它赋予ES高性能的数据检索能力,但Lucene仅仅是一个单机索引库。ES基于Lucene进行分布式封装,以支持集群管理、分布式查询、聚合分析等功能。从使用的直观感受看,ES按照下图方式实现了分布式查询:

图1  查询基本流程
图1 查询基本流程
  • 查询可发送到任意节点,接收到某查询的节点会作为该查询的协调节点(Coordinating Node)。
  • 协调节点解析查询,向对应数据分片分发查询子任务。
  • 各数据分片检索本地数据并返回协调节点,经汇聚处理后返回用户。

       而从实现角度看,协调节点的调度逻辑实际远比上述流程复杂,不同查询对应的协调节点的处理逻辑有一定差别。下面我们先简单介绍ES中常见的3类查询:

2.1 QUERY_THEN_FETCH

       这是最常用的查询类型,可以完成大多数的分布式查询和聚合分析功能。在这类查询中,协调节点实际需要向其他节点分发两轮任务,也就说前面流程图描述的任务分发阶段(2&3)会有两轮,具体如下:

  • Query Phase:进行分片粒度的数据检索和聚合,注意此轮调度仅返回文档id集合,并不返回实际数据。
    • 协调节点:解析查询后,向目标数据分片发送查询命令。
    • 数据节点:在每个分片内,按照过滤、排序等条件进行分片粒度的文档id检索和数据聚合,返回结果。
  • Fetch Phase:生成最终的检索、聚合结果。
    • 协调节点:归并Query Phase的结果,得到最终的文档id集合和聚合结果,并向目标数据分片发送数据抓取命令。
    • 数据节点:按需抓取实际需要的数据内容。
2.2 QUERY_AND_FETCH

       对于查询仅涉及单个分片的场景,ES会自动对查询流程做优化,在数据节点进行Query Phase的最后,直接执行Fetch操作。此类查询为QUERY_AND_FETCH。通过去除一轮任务调度优化查询性能,优化过程由ES自动完成,用户不感知。

2.3 DFS_QUERY_THEN_FETCH

       这类查询用于解决ES在多分片、少数据量的场景下计算相关度不准确的问题:以TF/IDF算法为例,ES在计算相关度时仅考虑单个分片内的IDF,可能导致查询结果中,类似的文档因为在不同分片而相关度大为不同的问题。此时可以使用此类查询,在QUERY_THEN_FETCH之前再增加一轮任务调度,用于计算分布式的IDF。但通常情况下,局部和全局IDF的差异会随着索引里文档数的增多渐渐消失,在真实世界的数据量下,这个问题几乎没有影响,没有必要使用此类查询增加一轮任务调度的开销。

       关于这类问题的具体描述,可以参考如下文档:

3. 查询执行流程

       本节我们深入到代码层面,以QUERY_THEN_FETCH类型查询为例,捋着代码主线,从实际执行角度分析ES的查询流程。查询流程的代码逻辑可以整体划分为两个部分:

  • 查询入口:ES接收到用户请求后,根据请求分发框架,进入对应接口的处理逻辑。这部分处理对任何ES请求都是类似的。
  • 查询调度:根据查询请求条件,进行查询的Query Phase、Fetch Phase等执行流程,返回查询结果。

       在分析具体的查询处理逻辑之前,我们先介绍查询入口部分,看看用户请求在ES中是如何被分发的。

3.1 查询入口

       ES提供用户Transport和Rest两种接口:用户可以通过ES官方提供的Transport Client访问ES集群,这种接口使用的协议与ES集群内部节点间的通讯协议一致;也可以使用简单易用的Rest接口,直接发送Http请求访问ES集群,由ES完成Rest请求到Transport请求的转换。考虑Rest接口的易用性,以及Rest层极低的额外开销,建议用户直接使用Rest接口。

       以Rest接口为例,查询入口部分的基本流程如下:

图2  查询分发流程
图2 查询分发流程
  • Rest分发

       Rest分发由RestController模块完成。在ES节点启动时,会加载所有内置请求的Rest Action,并把对应请求的Http路径和Rest Action作为<Path, RestXXXAction>二元组注册到RestController中。这样对于任意的Rest请求,RestController模块只需根据Http路径,即可轻松找到对应的Rest Action进行请求分发。RestSearchAction的注册样例如下:

public RestSearchAction(Settings settings, RestController controller) {
    super(settings);
    controller.registerHandler(GET, "/_search", this);
    controller.registerHandler(POST, "/_search", this);
    controller.registerHandler(GET, "/{index}/_search", this);
    controller.registerHandler(POST, "/{index}/_search", this);
    controller.registerHandler(GET, "/{index}/{type}/_search", this);
    controller.registerHandler(POST, "/{index}/{type}/_search", this);
}
  • RestSearchAction【Rest层】

       Rest层用于解析Http请求参数,转化为ES内部使用的Transport请求,然后转发给Transport层。其核心逻辑在prepareRequest(...)函数中:

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
    SearchRequest searchRequest = new SearchRequest();
    request.withContentOrSourceParamParserOrNull(parser ->
        parseSearchRequest(searchRequest, request, parser));

    return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
}
  • Transport分发

       Transport分发由NodeClient完成。在ES节点启动进行ActionModule.setupActions(...)时,会把对应请求的Transport路径和Transport Action作为<Action, TransportXXXAction>二元组注册到NodeClient中。NodeClient向外暴露的各种接口(如bulk/search),实际均通过Action对请求进行分发。

actions.register(BulkAction.INSTANCE.class,
                TransportShardBulkAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
  • TransportSearchAction【Transport层】

       Transport层的doExecute(...)函数是请求处理的核心入口,实现了多数请求处理的主要逻辑。在查询请求中,TransportSearchAction首先负责解析获取查询涉及的具体Index:

indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
                timeProvider.getAbsoluteStartMillis(), localIndices.indices());

        然后结合routing信息、perference信息获取后续用于任务分发的分片信息:

GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
            concreteIndices, routingMap, searchRequest.preference());

        最后生成查询请求的调度类并启动调度执行:

searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup,
                        aliasFilter, concreteIndexBoosts, searchPhaseController, executor, searchRequest, listener, shardIterators,
                        timeProvider, clusterStateVersion, task);

       上述即为查询入口的处理流程,它对任何Rest请求都适用。实际上,除了自带的Rest请求外,ES提供强大的扩展能力,用户可以通过自定义插件实现自己的请求及处理逻辑。此外,ES还支持自定义过滤器Filter,在实际进行Transport层处理前进行统一的预处理工作。

       介绍完查询入口后,下面我们具体介绍查询执行过程中的调度部分。

3.2 查询调度

       调用SearchQueryThenFetchAsyncAction.start(...)之后,查询即进入了以协调节点为中心的查询调度过程,即两个核心阶段Query Phase、Fetch Phase的执行,具体如下面时序图所示。此外,查询调度还包含两个轻量级阶段Expand Phase、Reponse Phase,后面我们按照实际执行顺序,依次介绍他们。

图3  查询调用时序图
图3 查询调用时序图
3.2.1 Query Phase
  • 协调节点

       SearchQueryThenFetchAsyncAction实际是Query Phase的入口,Phase名称由其构造函数体现:

super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
    shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()),
    request.getMaxConcurrentShardRequests());

       进入Query Phase后,会立即根据并发度参数进行Query任务的分发,具体由祖父类InitialSearchPhase的run(...)函数进行:

for (int index = 0; index < maxConcurrentShardRequests; index++) {
    final SearchShardIterator shardRoutings = shardsIts.get(index);
    assert shardRoutings.skip() == false;
    performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
}

       然后通过SearchTransportService的sendExecuteQuery(...)函数,向具体分片发送Query子任务进行异步执行:

transportService.sendChildRequest(connection, QUERY_ACTION_NAME, request, task,
        new ActionListenerResponseHandler<>(listener, supplier));

       在介绍每个分片的执行逻辑前,我们先提前了解分片执行结果的处理:每个分片在执行完毕Query子任务后,通过节点间通信,回调祖父类InitialSearchPhase的onShardSuccess(...)函数,把查询结果记录在协调节点保存的数组结构results中,并增加计数:

successfulOps.incrementAndGet();
results.consumeResult(result);

       当返回结果的分片数等于预期的总分片数时,协调节点会进入当前Phase的结束处理,启动下一个阶段Fetch Phase的执行。注意,这里有个有意思的地方,ES中只需要一个分片执行成功,即会进行后续Phase处理得到部分结果,当然它会在结果中提示用户实际有多少分片执行成功。

if (xTotalOps == expectedTotalOps) {
    onPhaseDone();  # 参考下面onPhaseDone代码
}
……
public final void onPhaseDone() {
    executeNextPhase(this, getNextPhase(results, this));  # 参考下面getNextPhase代码
}
……
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
    return new FetchSearchPhase(results, searchPhaseController, context);
}
  • 数据节点

       协调节点通过SearchTransportService的sendExecuteQuery(...)函数向目标数据节点发送QUERY_ACTION_NAME类型的查询子任务,通过请求路径QUERY_ACTION_NAME可以在SearchTransportService中找到对应的处理函数SearchService.executeQueryPhase(...):

transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SEARCH,
    new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
        @Override
        public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
            SearchPhaseResult result = searchService.executeQueryPhase(request, (SearchTask)task);
            channel.sendResponse(result);
        }
    });

       数据节点会尝试走canCache分支的Query Phase处理,这样可以利用Cache优化查询,否则走普通执行流程:

private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception {
    final boolean canCache = indicesService.canCache(request, context);
    context.getQueryShardContext().freezeContext();
    if (canCache) {
        indicesService.loadIntoContext(request, context, queryPhase);
    } else {
        queryPhase.execute(context);
    }
}

       QueryPhase.execute(...)为数据节点进行Query Phase子任务的核心逻辑,它首先从searchContext中获取查询参数和查询对象query,然后生产处理查询结果的collector,最终调用Lucene的IndexSearcher.search(...)函数进行查询,具体参考下面关键代码。

       这里先简单介绍下query、collector,帮助理解:

        a. query :查询对象用于指定查询条件,比如"host:host001 AND timestamp>1514736000",在分片内进行数据检索。

        b. collector :用于消费检索结果,进行Shard级别的limit N(Top N)、聚合计算等操作。它的实现也较为容易理解,如优先级队列、多层嵌套的hash分桶等。注意这里仅获取排序 或 聚合涉及的字段,source、store等内容需要在Fetch Phase中获取。

# 获取参数和查询对象
queryResult.from(searchContext.from());
queryResult.size(searchContext.size());
Query query = searchContext.query();
……
# 生产处理查询结果的collector
#     limit N对应的collector
collector = TopScoreDocCollector.create(numDocs, after);
……
final List<Collector> subCollectors = new ArrayList<>();
subCollectors.add(collector);
#     聚合分析对应的collector
subCollectors.addAll(searchContext.queryCollectors().values());
collector = MultiCollector.wrap(subCollectors);
……
searcher.search(query, collector);

       另外,如果查询仅涉及一个分片,数据节点会在Query Phase结尾处,直接执行Fetch Phase,即QUERY_AND_FETCH类型查询:

if (request.numberOfShards() == 1) {
    return executeFetchPhase(context, operationListener, afterQueryTime);
}
3.2.2 Fetch Phase
  • 协调节点

       Fetch Phase首先会归并Query Phase得到的文档id集合,并排序得到最终的limit N,同时归并多个分片的聚合数据得到最终的聚合结果。这一步通过reduce操作完成:

final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce();

       然后对需要抓取具体数据的文档id按照分片粒度进行划分,并向对应分片发送抓取请求:

final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, reducedQueryPhase.scoreDocs);
……
for (int i = 0; i < docIdsToLoad.length; i++) {
	……
	executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), connection);
	……
}

       后续执行逻辑和Query Phase类似,每个分片在执行完毕Query子任务后,通过节点间通信,回调innerOnResponse(...)函数通知协调节点,结果会使用shard id作为下标放入数组结构fetchResults中:

successfulOps.incrementAndGet();
results.consumeResult(result);

       当最后一个分片执行完成后,协调节点会进入当前Phase结束处理:合并fetch阶段的结果集,并启动下一个阶段执行。

final Runnable finishPhase = ()
    -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
    queryResults : fetchResults);  # 参考下面moveToNextPhase代码
……
private void moveToNextPhase(SearchPhaseController searchPhaseController,
                             String scrollId, SearchPhaseController.ReducedQueryPhase reducedQueryPhase,
                             AtomicArray<? extends SearchPhaseResult> fetchResultsArr) {
    final InternalSearchResponse internalResponse = searchPhaseController.merge(context.getRequest().scroll() != null,
        reducedQueryPhase, fetchResultsArr.asList(), fetchResultsArr::get);
    context.executeNextPhase(this, nextPhaseFactory.apply(internalResponse, scrollId));
}

       Fetch Phase的构造函数也向我们展示了后续需要执行的两个简单阶段,后面我们会简要介绍:

FetchSearchPhase(InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> resultConsumer,
                 SearchPhaseController searchPhaseController,
                 SearchPhaseContext context) {
    this(resultConsumer, searchPhaseController, context,
        (response, scrollId) -> new ExpandSearchPhase(context, response, // collapse only happens if the request has inner hits
            (finalResponse) -> sendResponsePhase(finalResponse, scrollId, context)));
}
  • 数据节点

       协调节点通过SearchTransportService的sendExecuteFetch(...)函数向目标数据节点发送Transport路径为FETCH_ID_ACTION_NAME的查询子任务,通过FETCH_ID_ACTION_NAME可以在SearchTransportService中找到对应的处理函数SearchService.executeFetchPhase(...):

transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
    new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
        @Override
        public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
            FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
            channel.sendResponse(result);
        }
    });

       然后进入实际的Fetch处理逻辑FetchPhase.execute(...),在这里fetchSubPhases是一系列简单的抓取任务,会按照docid抓取对应文档的source、store fields、highlight、docvalue fields等信息:

for (FetchSubPhase fetchSubPhase : fetchSubPhases) {
    fetchSubPhase.hitsExecute(context, hits);
}
3.2.3 Expand Phase

       在Fetch Phase协调节点处理的结束阶段,我们看到下一个执行阶段为Expand Phase,用于完成ES 5.3版本以后支持的Field Collapsing查询。通过该类查询可以轻松实现按Field值进行分类,每个分类获取排名前N的文档。如在餐厅的菜单系统中按菜系(川菜、湘菜等)分类,获取每个菜系排名前3的美食。用户也可以按Field进行Aggregation实现类似功能,但Field Collapsing会更易用、高效。

       Field Collapsing属于一类特殊的查询场景,这里不详细介绍。

3.2.4 Response Phase

       Expand Phase的下一执行阶段为Response Phase,用于将查询结果返回用户:

private static SearchPhase sendResponsePhase(InternalSearchResponse response, String scrollId, SearchPhaseContext context) {
    return new SearchPhase("response") {
        @Override
        public void run() throws IOException {
            context.onResponse(context.buildSearchResponse(response, scrollId));
        }
    };
}

4. 小结

       本文主要分析了ES的分布式执行框架及查询主体流程,对ES其它他流程及Lucene相关内容未做详细介绍,后续我们会通过具体文章详细介绍,欢迎大家一起交流讨论。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 背景
  • 2. 分布式查询框架及类型
    • 2.1 QUERY_THEN_FETCH
      • 2.2 QUERY_AND_FETCH
        • 2.3 DFS_QUERY_THEN_FETCH
        • 3. 查询执行流程
          • 3.1 查询入口
            • 3.2 查询调度
              • 3.2.1 Query Phase
                • 3.2.2 Fetch Phase
                  • 3.2.3 Expand Phase
                    • 3.2.4 Response Phase
                    • 4. 小结
                    相关产品与服务
                    Elasticsearch Service
                    腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档