前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Bulk API

Bulk API

作者头像
程序猿杜小头
发布2022-12-01 21:33:24
3640
发布2022-12-01 21:33:24
举报
文章被收录于专栏:程序猿杜小头程序猿杜小头

Bulk API

Bulk API提供了一种在单个请求中针对批量文档执行idnexcreatedeleteupdate 操作的方法。显然,批量操作大大提升了CRUD操作的效率,因为它将多个IO请求归并到一个IO请求中。

Bulk API请求体是一种NDJSON(newline-delimited json)数据结构,NDJSON数据结构中每一行必须以换行符\n结尾,但这个换行符不需要显式添加,因为大多数文本编辑器会自动追加换行符。另外,Http Request Header中Content-Type值必须为application/x-ndjson

代码语言:javascript
复制
operation_and_meta_data
optional_source
operation_and_meta_data
optional_source
operation_and_meta_data
optional_source
...

1 Request

代码语言:javascript
复制
POST /_bulk
POST /<target>/_bulk

1.1 Path Parameter

target用于指定数据流名称、索引名称或索引别名,可选参数。

1.2 Query Parameter

1.2.1 pipeline

管道唯一标识,用于对文档进行预处理;可选参数,无默认值。

1.2.2 _source

_source值为true,则bulk api响应内容中会包含_source字段;可选参数,默认值为false。

1.2.3 refresh

可选参数,默认值为false。

参数值

描述

true

立即触发refresh操作

false

不立即触发refresh操作

wait_for

等待refresh操作

1.2.4 routing

routing参数主要用于指定主本分片;可选参数,无默认值。

1.2.5 timeout

timeout参数并不是指bulk api响应超时时间,而是针对每个文档操作的等待超时时间,比如index操作,可能涉及等待:automatic index creationdynamic mapping updateswaiting for active shards;可选参数,默认值1m。

1.2.6 wait_for_active_shards

一般,inedx操作在正式执行之前,需要等待一定数量的active shards,分片数量就是由wait_for_active_shards参数设定,wait_for_active_shards默认值为1,即一个主本分片即可,wait_for_active_shards最大值为一个主本分片与其所有副本分片之和。如果当前活跃分片数小于wait_for_active_shards值,那么index操作必须等待并重试。

1.3 Request Body

1.3.1 operation

操作名称

描述

create

文档若已存在,则不执行文档写入操作

delete

删除文档

update

更新文档

index

文档若已存在,则执行文档更新操作(upsert);文档若不存在,则执行文档写入操作

1.3.2 meta_data

操作名称

操作描述

_index

索引名称,如果请求路径参数缺失,则该参数必选

_id

文档唯一标识,可选,若缺失,则自动生成

require_alias

若值为true,则必须在元数据中追加索引别名,默认值为false

1.3.3 operation_source

参数

数据类型

描述

doc

object

对应update操作

object

对应index和create操作

1.4 实战

代码语言:javascript
复制
cat request_body_ndjson
-----------------------
{"index":{"_index":"bulk_test", "_id":"1"}}
{"field1":"index_value1","field2":"index_value2","field3":"index_value3","field4":"index_value4"}
{"index":{"_index":"bulk_test", "_id":"2"}}
{"field1":"index_value1"}
{"update":{"_index":"bulk_test", "_id":"1"}}
{"doc":{"field":"update_value"}}
{"delete":{"_index":"bulk_test", "_id":"2"}}
代码语言:javascript
复制
curl -request POST http://localhost:9200/_bulk \
  -header "Content-Type:application/x-ndjson" \
  -header "Authorization: Basic ZWxhc3RpYzpRd2UxMjMhQGNtc3M=" \
  --data-binary "@/apps/dukui/request_body_ndjson"
-----------------------
{
    "took": 311,
    "errors": false,
    "items": [
        {
            "index": {
                "_index": "bulk_test",
                "_type": "_doc",
                "_id": "1",
                "_version": 1,
                "result": "created",
                "_shards": {
                    "total": 2,
                    "successful": 1,
                    "failed": 0
                },
                "_seq_no": 12,
                "_primary_term": 1,
                "status": 201
            }
        },
        {
            "index": {
                "_index": "bulk_test",
                "_type": "_doc",
                "_id": "2",
                "_version": 1,
                "result": "created",
                "_shards": {
                    "total": 2,
                    "successful": 1,
                    "failed": 0
                },
                "_seq_no": 13,
                "_primary_term": 1,
                "status": 201
            }
        },
        {
            "update": {
                "_index": "bulk_test",
                "_type": "_doc",
                "_id": "1",
                "_version": 2,
                "result": "updated",
                "_shards": {
                    "total": 2,
                    "successful": 1,
                    "failed": 0
                },
                "_seq_no": 14,
                "_primary_term": 1,
                "get": {
                    "_seq_no": 14,
                    "_primary_term": 1,
                    "found": true,
                    "_source": {
                        "field1": "index_value1",
                        "field2": "index_value2",
                        "field3": "index_value3",
                        "field4": "index_value4",
                        "field": "update_value"
                    }
                },
                "status": 200
            }
        },
        {
            "delete": {
                "_index": "bulk_test",
                "_type": "_doc",
                "_id": "2",
                "_version": 2,
                "result": "deleted",
                "_shards": {
                    "total": 2,
                    "successful": 1,
                    "failed": 0
                },
                "_seq_no": 15,
                "_primary_term": 1,
                "status": 200
            }
        }
    ]
}

2 RestHighLevelClient Bulk

2.1 注入RestHighLevelClient实例

代码语言:javascript
复制
@Resource
private RestHighLevelClient restHighLevelClient;

2.2 构建BulkRequest实例

代码语言:javascript
复制
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(TimeValue.timeValueSeconds(5));
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
for (int j = 0; j < 10; j++) {
    IndexRequest indexRequest = new IndexRequest("bulk")
            .id(String.valueOf(j + 1))
            .source(
                    XContentType.JSON,
                    "title", "Elasticsearch Bulk API",
                    "author", "optimus prime",
                    "date", LocalDateTime.now()
            );
    bulkRequest.add(indexRequest);
}

2.3 执行BulkRequest

2.3.1 同步
代码语言:javascript
复制
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
2.3.2 异步
代码语言:javascript
复制
restHighLevelClient.bulkAsync(
        bulkRequest,
        RequestOptions.DEFAULT,
        new ActionListener<BulkResponse>() {
            @Override
            public void onResponse(BulkResponse bulkResponse) {

            }

            @Override
            public void onFailure(Exception e) {

            }
        });

3 BulkProcessor Bulk

BulkProcessor允许我们基于不同策略来配置flush操作的触发时机;同时,还能轻松控制BulkRequest的并发执行数;另外,BulkProcessor是线程安全的。

3.1 配置

代码语言:javascript
复制
@Bean
public BulkProcessor bulkProcessor(RestHighLevelClient restHighLevelClient) {
    BulkProcessor.Builder builder = BulkProcessor.builder(
            (bulkRequest, bulkResponseActionListener) ->
                    restHighLevelClient.bulkAsync(
                            bulkRequest,
                            RequestOptions.DEFAULT,
                            bulkResponseActionListener
                    )
            ,
            new BulkProcessor.Listener() {
                /*
                 * 每个BulkRequest执行之前该逻辑被调用
                 */
                @Override
                public void beforeBulk(long executionId, BulkRequest bulkRequest) {

                }

                /*
                 * 每个BulkRequest执行成功之后该逻辑被调用
                 */
                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse bulkResponse) {
                    for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
                        if (bulkItemResponse.isFailed()) {
                            BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
                            continue;
                        }

                        DocWriteResponse itemResponse = bulkItemResponse.getResponse();
                        switch (bulkItemResponse.getOpType()) {
                            case INDEX:
                            case CREATE:
                                IndexResponse indexResponse = (IndexResponse) itemResponse;
                                break;
                            case UPDATE:
                                UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                                break;
                            case DELETE:
                                DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        }
                    }
                }

                /*
                 * 每个BulkRequest执行失败之后该逻辑被调用
                 */
                @Override
                public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable failure) {

                }
            });
    /*
     * 当BulkRequest中action数量达到1000时执行flush操作;默认值为1000,若值为-1意味着禁用该配置项
     */
    builder.setBulkActions(1000);
    /*
     * 当BulkRequest中action字节体量达到5MB时执行flush操作;默认值为5MB,若值为-1意味着禁用该配置项
     */
    builder.setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB));
    /*
     * BulkRequest并发执行数,在一定程度上,可以抑制BulkRequest的堆积;默认值为1,若值为0意味着串行执行BulkRequest;若值为-1意味着禁用该配置项
     */
    builder.setConcurrentRequests(1);
    /*
     * 每间隔10秒执行一次flush操作,默认禁用该配置
     */
    builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
    /*
     * 当BulkRequest执行失败时,BulkProcessor如何重试;默认重试机制为BackoffPolicy.exponentialBackoff()
     */    
    builder.setBackoffPolicy(
            BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)
    );
    /*
     * 构建BulkProcessor实例对象
     */
    return builder.build();
}

3.2 注入BulkProcessor实例

代码语言:javascript
复制
@Resource
private BulkProcessor bulkProcessor;

3.3 构建IndexRequest/DeleteRequest/DocWriteRequest实例

代码语言:javascript
复制
for (int j = 0; j < 10; j++) {
    IndexRequest indexRequest = new IndexRequest("bulk")
            .id(String.valueOf(j + 1))
            .source(
                    XContentType.JSON,
                    "title", "Elasticsearch Bulk API",
                    "author", "optimus prime",
                    "date", LocalDateTime.now()
            );
    bulkProcessor.add(indexRequest);
}
bulkProcessor.flush();

参考文档

  1. https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
  2. https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-bulk.html
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-03-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序猿杜小头 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Bulk API
    • 1 Request
      • 1.1 Path Parameter
      • 1.2 Query Parameter
      • 1.3 Request Body
      • 1.4 实战
    • 2 RestHighLevelClient Bulk
      • 2.1 注入RestHighLevelClient实例
      • 2.2 构建BulkRequest实例
      • 2.3 执行BulkRequest
    • 3 BulkProcessor Bulk
      • 3.1 配置
      • 3.2 注入BulkProcessor实例
      • 3.3 构建IndexRequest/DeleteRequest/DocWriteRequest实例
    • 参考文档
    相关产品与服务
    Elasticsearch Service
    腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档