ES的读取分为Get和Search两种操作,这两种读取操作有较大的差异,GET/MGET必须指定三元组:index、_type、_id。 也就是说,根据文档id从正排索引中获取内容。而Search不指定_id,根据关键词从倒排索引中获取内容。本章分析GET/MGET过程,下一章分析Search过程。
一个GET请求的简单例子(来自官网)如下:
curl -XGET http://127.0.0.1: 9200/website/blog/1?pretty
"_ index" : "website",
"_type" : "blog",
"id" : "1",
"version" : 21,
" found" : true,
"_source" : {
"first name" : "John",
"last_ name" : "Smith",
"age" : 25,
"about" : "I
"love to go rock c1 imbing",
"interests" :[
"sports",
"music"
]
}
}
与写请求相同,GET请求时可以在URI中设置一些可选参数,如下表所示。
搜索和读取文档都属于读操作,可以从主分片或副分片中读取数据。读取单个文档的流程(图片来自官网)如下图所示。
这个例子中的索引有一个主分片和两个副分片。以下是从主分片或副分片中读取时的步骤:
NODE1作为协调节点,会将客户端请求轮询发送到集群的所有副本来实现负载均衡。
在读取时,文档可能已经存在于主分片上,但还没有复制到副分片。在这种情况下,读请求命中副分片时可能会报告文档不存在,但是命中主分片可能成功返回文档。一旦写请求成功返回给客户端,则意味着文档在主分片和副分片都是可用的。
GET/MGET
流程涉及两个节点:协调节点和数据节点,流程如下图所示。.
执行本流程的线程池:http_server_worker
TransportSingleShardAction
类用来处理存在于一个单个(主或副)分片上的读请求。将请求转发到目标节点,如果请求执行失败,则尝试转发到其他节点读取。在收到读请求后,处理过程如下。
TransportSingleShardAction.AsyncSingleAction
构造函数中,准备集群状态、节点列表等信息。shardid
,也就是文档应该落在哪个分片上。shardid
后,结合请求参数中指定的优先级和集群状态确定目标节点,由于分片可能存在多个副本,因此计算出的是一个列表。private AsyncSingleAction (Request request, ActionListener<Response> listener) {
ClusterState clusterState = clusterService.state();
//集群nodes列表
nodes = clusterState.nodes();
//解析请求,更新自定义routing
resolveRequest(clusterState, internalRequest);
//根据路由算法计算得到目的shard迭代器,或者根据优先级选择目标节点
this.shardIt = shards(clusterState, internalRequest);
}
具体的路由算法参考写流程分析。
作为协调节点,向目标节点转发请求,或者目标是本地节点,直接读取数据。发送函数声明了如何对Response进行处理:AsyncSingleAction 类中声明对Response进行处理的函数。无论请求在本节点处理还是发送到其他节点,均对Response执行相同的处理逻辑:
private void perform (@Nullable final Exception currentFailure) {
DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
if (node == null) {
onFailure (shardRouting, new NoShardAvailableActionException(shardRouting.shardId()));
} else {
inte.rnalRequest.request().internalShardId = shardRouting.shardId();
transportService.sendRequest(node, . . .
public void handleResponse (final Response response) {
listener.onResponse(response);
}
public void handleException (TransportException exp) {
onFailure(shardRouting, exp);
}
]);
}
}
发送的具体过程:
private void sendLocalRequest (long requestId, final String action, final
TransportRequest request, TransportRequestOptions options) {
inal DirectResponseChannel channel = new DirectResponseChannel (logger, localNode, action, requestId, this, threadPool);
try {
//根据action获取注册的reg
final RequestHandlerRegistry reg = getRequestHandler(action);
reg.processMessageReceived (request, channel);
}
}
private void onFailure (ShardRouting shardRouting, Exception e) {
perform(e);
}
内容路由结束时构造了目标节点列表的迭代器,重试发送时,目标节点选择迭代器的下一个。
执行本流程的线程池:get
数据节点接收协调节点请求的入口为:TransportSingleShardAction.ShardTransportHandler#messageReceived
读取数据并组织成Response,给客户端channel返回:
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
Response response = shardOperation (request,request.internalShardId);
channel.sendResponse (response);
}
shardOperation先检查是否需要refresh,然后调用indexShard.getService().get()读取数据并存储到GetResult中。
在ShardGetService#get()函数中,调用:
GetResult getResult = innerGet();
获取结果。GetResult 类用于存储读取的真实数据内容。核心的数据读取实现在ShardGetService#innerGet()函数中:
private GetResult innerGet(...) {
final Collection<String> types;
/ /处理all选项
if (type == null || type.equals("_ all")) {
....
}
Engine. GetResult get = null;
for (String typeX : types) {
//调用Engine读取数据
get = indexShard.get(new Engine.Get (realtime, typeX, id, uidTerm).version (version).versionType (versionType));
try {
//过滤返回结果
return innerGetLoadFromStoredFields (type, id, gFields, fetchSourceContext, get, mapperService);
} finally {
get.release();
}
}
InternalEngine#get过程会加读锁。处理realtime选项,如果为true,则先判断是否有数据可以刷盘,然后调用Searcher进行读取。Searcher 是对IndexSearcher的封装。
在早期的ES版本中,如果开启( 默认) realtime,则会尝试从translog 中读取,刚写入不久的数据可以从translog中读取;从ES5.x开始不会从translog中读取,只从Lucene中读orealtime的实现机制变成依靠refresh 实现。参考官方链接:https://github.com/elastic/elasticsearch/pull/20102
public GetResult get (Get get, BiFunction<String, SearcherScope, Searcher>searcherFactory) throws EngineException {
try (ReleasableLock ignored = readLock.acquire() ) {
ensureOpen();
SearcherScope scope;
//处理realtime选项,判断是否需要刷盘
if (get.realtime()) {
//versionMap中的值是写入索引的时候添加的,不会写磁盘
VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes());
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT.EXISTS;
}
if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())){
throw new VersionConflictEngineException(...);
}
//执行刷盘操作
refresh ("realtime get", Searcher Scope . INTERNAL);
}
scope = SearcherScope.INTERNAL;
} else {
scope = SearcherScope.EXTERNAL;
}
//调用 Searcher 读取数据
return getFromSearcher(get, searcherFactory, scope) ;
}
MGET的主要处理类: TransportMultiGetAction,通过封装单个GET请求实现,处理流程如下图所示。
主要流程如下:
回复的消息中文档顺序与请求的顺序一致。如果部分文档读取失败,则不影响其他结果,检索失败的doc会在回复信息中标出。
我们需要警惕实时读取特性,GET API默认是实时的,实时的意思是写完了可以立刻读取,但仅限于GET、MGET操作,不包括搜索。在5.x版本之前,GET/MGET的实时读取依赖于从translog中读取实现,5.x 版本之后的版本改为refresh,因此系统对实时读取的支持会对写入速度有负面影响。
由此引出另一个较深层次的问题是, update操作需要先GET再写,为了保证一致性, update调用GET时将realtime选项设置为true,并且不可配置。因此update操作可能会导致refresh生成新的Lucene分段。.
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。