前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elasticsearch Multi Get、 Bulk API详解、原理与示例

Elasticsearch Multi Get、 Bulk API详解、原理与示例

作者头像
丁威
发布2019-06-10 17:46:01
2.1K0
发布2019-06-10 17:46:01
举报
文章被收录于专栏:中间件兴趣圈中间件兴趣圈

本文将详细介绍批量获取API(Multi Get API)与Bulk API。

1、Multi Get API

  • public final MultiGetResponse mget(MultiGetRequest multiGetRequest, RequestOptions options) throws IOException
  • public final void mgetAsync(MultiGetRequest multiGetRequest, RequestOptions options, ActionListener<MultiGetResponse> listener)

其核心需要关注MultiGetRequest 。

从上面所知,mget及批量获取文档,通过add方法添加多个Item,每一个item代表一个文件获取请求,其相关字段已在get API中详细介绍,这里就不做过多详解。

Mget API使用示例

代码语言:javascript
复制
public static void testMget() {
       RestHighLevelClient client = EsClient.getClient();
       try {
           MultiGetRequest request = new MultiGetRequest();
           request.add("twitter", "_doc", "10");
           request.add("twitter", "_doc", "11");
           request.add("twitter", "_doc", "12");
           request.add("gisdemo", "_doc", "10");
           MultiGetResponse result = client.mget(request, RequestOptions.DEFAULT);
           System.out.println(result);
       } catch (Throwable e) {
           e.printStackTrace();
       } finally {
           EsClient.close(client);
       }
   }

返回的结果其本质是一个 GetResponse的数组,不会因为其中一个失败,整个请求失败,但其结果中会标明每一个是否成功。其返回结果类图如下:

其字段过滤(Source filtering)、路由等机制与Get API相同,故不重复讲解。

2、Bluk API详解

Bulk API可以在一次API调用中包含多个索引操作,例如更新索引,删除索引等。其API定义如下:

  • public final BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException
  • public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener<BulkResponse> listener)

其核心需要关注BulkRequest。

2.1BulkRequest详解

  • List<DocWriteRequest> requests:单个命令容器,DocWriteRequest的子类包括:IndexRequest、UpdateRequest、DeleteRequest。
  • private final Set<String> indices:requests涉及到的索引。
  • List<Object> payloads :有效载荷,6.4.0版本,貌似该字段意义不大,通常命令的请求体(负载数据)存放在DocWriteRequest对象中,例如IndexRequest的source字段。
  • protected TimeValue timeout:timeout机制,针对一个Bulk请求生效。
  • ActiveShardCount waitForActiveShards:针对整个Bulk请求有效。
  • private RefreshPolicy refreshPolicy = RefreshPolicy.NONE:刷新策略。
  • private long sizeInBytes = 0:整个Bulk请求的大小。

通过add api为BulkRequest添加一个请求。

2.2 Bulk API请求格式详解

Bulk Rest请求协议基于如下格式:

代码语言:javascript
复制
POST _bulk
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

其请求格式定义如下(restfull):

  • POST请求,其Content-Type为application/x-ndjson。
  • 每一个命令占用两行,每行的结束字符为\r\n。
  • 第一行为元数据,"opType" : {元数据}。
  • 第二行为有效载体(非必选),例如Index操作,其有效载荷为IndexRequest#source字段。
  • opType可选值 index、create、update、delete。
  • 公用元数据(index、create、update、delete)如下

1)_index :索引名

2)_type:类型名

3)_id:文档ID

4)routing:路由值

5)parent

6)version:数据版本号

7)version_type:版本类型

  • 各操作特有元数据

1、index | create

1)pipeline

2、update

1)retry_on_conflict :更新冲突时重试次数。

2)_source:字段过滤。

  • 有效载荷说明

1、index | create

其有效载荷为_source字段。

2、update

其有效载荷为:partial doc, upsert and script。

3、delete

没有有效载荷。

对请求格式为什么要设计成metdata+有效载体的方式,主要是为了在接受端节点(所谓的接受端节点是指收到命令的第一节点),只需解析metadata,然后将请求直接转发给对应的数据节点。

2.3 bulk API通用特性分析

2.3.1 版本管理

每一个Bulk条目拥有独自的version,存在于请求条目的item的元数据中。

2.3.2 路由

每一个Bulk条目各自生效。

2.3.3 Wait For Active Shards

通常可以设置BulkRequest#waitForActiveShards来要求Bulk批量执行之前要求处于激活的最小副本数。

2.3.4 Bulk Demo

代码语言:javascript
复制
public static final void testBulk() {
       RestHighLevelClient client = EsClient.getClient();
       try {
           IndexRequest indexRequest = new IndexRequest("twitter", "_doc", "12")
                   .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk"));           UpdateRequest updateRequest = new UpdateRequest("twitter", "_doc", "11")
                       .doc(new IndexRequest("twitter", "_doc", "11")
                               .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk update")));           BulkRequest request = new BulkRequest();
           request.add(indexRequest);
           request.add(updateRequest);
           BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
           for (BulkItemResponse bulkItemResponse : bulkResponse) {
               if (bulkItemResponse.isFailed()) {
                   BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                   System.out.println(failure);
                   continue;
               }
               DocWriteResponse itemResponse = bulkItemResponse.getResponse();
               if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
                       || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                   IndexResponse indexResponse = (IndexResponse) itemResponse;
                   System.out.println(indexRequest);
               } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
                   UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                   System.out.println(updateRequest);
               } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
                   DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                   System.out.println(deleteResponse);
               }
           }
       } catch (Exception e) {
           e.printStackTrace();
       } finally {
           EsClient.close(client);
       }
   }

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-11-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 中间件兴趣圈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档