前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ES5.6 Bulk源码解析

ES5.6 Bulk源码解析

作者头像
YG
发布2018-05-23 17:07:18
9681
发布2018-05-23 17:07:18
举报
文章被收录于专栏:YG小书屋
Bulk注册

在启动类BootStrap的start()方法中,启动了node.start()方法。在Node初始化的过程中,加载了一系列的模块和插件,其中包含ActionModel。

代码语言:javascript
复制
 ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
                settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
                threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService);
 modules.add(actionModule);

在ActionModel中,注册了我们常用的一些操作action,比如说我们这次解析的BulkAction:

代码语言:javascript
复制
  actions.register(UpdateAction.INSTANCE, TransportUpdateAction.class);
  actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class,TransportShardMultiGetAction.class);
  actions.register(BulkAction.INSTANCE, TransportBulkAction.class,TransportShardBulkAction.class);

并且初始化RestHandler:

代码语言:javascript
复制
 registerHandler.accept(new RestMultiTermVectorsAction(settings, restController));
 registerHandler.accept(new RestBulkAction(settings, restController));
 registerHandler.accept(new RestUpdateAction(settings, restController));

在RestBulkAction中规定了我们的查询方式:

代码语言:javascript
复制
  controller.registerHandler(POST, "/_bulk", this);
  controller.registerHandler(PUT, "/_bulk", this);
  controller.registerHandler(POST, "/{index}/_bulk", this);
  controller.registerHandler(PUT, "/{index}/_bulk", this);
  controller.registerHandler(POST, "/{index}/{type}/_bulk", this);
  controller.registerHandler(PUT, "/{index}/{type}/_bulk", this);
接收到请求

RestBulkAction在prepareRequest方法中将我们普通的RestRequest转化为BulkReqest,并通过NodeClient调用:

代码语言:javascript
复制
 channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));

而在NodeClient的bulk中则是调用了NodeClient的doExecute()方法。

代码语言:javascript
复制
doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener)

传入的Action是BulkAction.Instance,request就是上一步封装的BulkRequest,listener则是监听器。

在doExecute方法中,首先将普通的action转化为tansportAction,然后用转化后的tansportAction执行该请求:

代码语言:javascript
复制
transportAction(action).execute(request, listener);

bulkAction转化后变为TransportBulkAction,而TransportBulkAction的execute方法则是调用本身的doExecute()方法。在doExecut()方法中首先将存在和不存在的索引分类:

代码语言:javascript
复制
1)Step 1: collect all the indices in the request
2)Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create that we'll use when we try to run the requests.
3)Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.

然后执行executeBulk()方法,接着在executeBulk中创建一个BulkOperation,并开始执行该BulkOperation:

代码语言:javascript
复制
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener,
        final AtomicArray<BulkItemResponse> responses, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
    new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run();
}

在BulkOperation中存在两次遍历Bulk中所有的请求,第一次遍历则将给该请求设置Routing,Mapping等等,如果允许产生ID,则自动生成ID。第二次遍历则是根据shardID将请求分类。ES官网有说到批量处理时让用bulk,原因是bulk处理请求时做了一些底层的优化。这就是一个优化点,将同一个shard的请求集合在一起直接发送到节点对应的shard,避免请求在节点间传递,影响效率。

代码语言:javascript
复制
for (int i = 0; i < bulkRequest.requests.size(); i++) {
    ....
    switch (docWriteRequest.opType()) {
                    case CREATE:
                    case INDEX:
                        IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                        MappingMetaData mappingMd = null;
                        final IndexMetaData indexMetaData = metaData.index(concreteIndex);
                        if (indexMetaData != null) {
                            mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
                        }
                        indexRequest.resolveRouting(metaData);
                        indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
            ....
    }
....
}

....
for (int i = 0; i < bulkRequest.requests.size(); i++) {
            DocWriteRequest request = bulkRequest.requests.get(i);
            if (request == null) {
                continue;
             }
            String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
            ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
            List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
            shardRequests.add(new BulkItemRequest(i, request));
         }

然后针对不同的shardRequest,分别用shardBulkAction处理:

代码语言:javascript
复制
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {}
每个shard的处理流程

接下来就是复杂的类继承关系了:

代码语言:javascript
复制
TransportShardBulkAction>TransportWriteAction >TransportReplicationAction>TransportAction

上一步的shardBulkAction.execute()方法则是执行的TransportAction的execute方法。我看得源码版本是5.6版本的,与5.0版本相比,ES增加了一个 TransportWriteAction类,而且在TransportReplicationAction不是直接运行run方法,而是通过transportService的RPC接口在实现功能。具体的流程如下:

1)TransportAction.execute()方法会调用TransportReplicationAction的doExecute()方法

2)在TransportReplicationAction的doExecute()方法中执行ReroutePhase的run方法,run方法中根据请求的shardID获取到primary shardID,同时得到primary shard的NodeID,如果当前节点包含primary shard,则执行performLocalAction方法,否则执行performRemoteAction。

3)performLocalAction和performRemoteAction最终都将执行performAction方法,在performAction中我们可以看到,transportService发送请求:

代码语言:javascript
复制
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {}

4)transportService接收到请求后用的PrimaryOperationTransportHandler处理,至于PrimaryOperationTransportHandler是在TransportReplicationAction中注册的:

代码语言:javascript
复制
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
        new PrimaryOperationTransportHandler());

5)PrimaryOperationTransportHandler则是一个primary操作的处理类,在这个类接收到信息之后调用AsyncPrimaryAction处理:

代码语言:javascript
复制
@Override
    public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
        new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run();
    }

6)在AsyncPrimaryAction中首先获取shard锁,如果成功的获取到锁则调用自身的onresponse()方法,否则将获取操作加入线程池:

代码语言:javascript
复制
            synchronized (this) {
            releasable = tryAcquire();
            if (releasable == null) {
                // blockOperations is executing, this operation will be retried by blockOperations once it finishes
                if (delayedOperations == null) {
                    delayedOperations = new ArrayList<>();
                }
                final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
                if (executorOnDelay != null) {
                    delayedOperations.add(
                        new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
                            new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
                } else {
                    delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
                }
                return;
            }
        }

7)在onresponse中,如果该primaryShardReference已经被移动了,则获取到正确的primary shard和nodeID重新发送请求。否则就用primaryShardReference直接处理:

代码语言:javascript
复制
 @Override
    public void onResponse(PrimaryShardReference primaryShardReference) {
        try {
            if (primaryShardReference.isRelocated()) {
                primaryShardReference.close(); // release shard operation lock as soon as possible
                setPhase(replicationTask, "primary_delegation");
                // delegate primary phase to relocation target
                // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
                // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
                final ShardRouting primary = primaryShardReference.routingEntry();
                assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
                DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
                transportService.sendRequest(relocatingNode, transportPrimaryAction,
                    new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId()),
                    transportOptions,
                    new TransportChannelResponseHandler<Response>(logger, channel, "rerouting indexing to target primary " + primary,
                        TransportReplicationAction.this::newResponseInstance) {

                        @Override
                        public void handleResponse(Response response) {
                            setPhase(replicationTask, "finished");
                            super.handleResponse(response);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            setPhase(replicationTask, "finished");
                            super.handleException(exp);
                        }
                    });
            } else {
                setPhase(replicationTask, "primary");
                final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
                final boolean executeOnReplicas = (indexMetaData == null) || shouldExecuteReplication(indexMetaData);
                final ActionListener<Response> listener = createResponseListener(primaryShardReference);
                createReplicatedOperation(request,
                        ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
                        primaryShardReference, executeOnReplicas)
                        .execute();
            }
        } catch (Exception e) {
            Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
            onFailure(e);
        }
    }

8)createReplicatedOperation看名字还以为直接就是副本处理了,点进去看了之后才发现是先执行primary,后执行replia。

代码语言:javascript
复制
 primaryResult = primary.perform(request);
    ...
 performOnReplicas(replicaRequest, shards);
主分片处理

主分片的处理调用的是PrimaryShardReference.perform()方法,在该方法中则是调用shardOperationOnPrimary()进行主分片的处理。

shardOperationOnPrimary()方法则是由TransportShardBulkAction来实现的,具体执行的步骤如下:

1)获取节点中所有的索引元数据

2)获取版本号

3)更新mapping

4)调用Engin底层的代码。比如说primary.delete(delete),primary.index(operation)等等。

5)写到tanslog中

副本分片和主分片类似,这里就不做过多解释。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017.09.29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Bulk注册
  • 接收到请求
  • 每个shard的处理流程
  • 主分片处理
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档