Java SDK

最近更新时间:2024-07-11 11:53:21

我的收藏
本文介绍通过 Java SDK 将数据投递到 Elasticsearch Serverless 服务中的相关操作。

前提条件

已创建 serverless 空间及索引, 获取内网访问地址、用户名密码、索引名等信息。
java 依赖。
<!-- 建议使用7.5.1以上版本的elasticsearch-rest-high-level-client-->
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.5.1</version> </dependency>

代码 demo

以下实例代码,功能为通过 Java SDK 写入 Elasticsearch Serverless 服务,并进行简单查询。
package com.ckafka_to_es; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.Date; import java.util.HashMap; import java.util.Map; public class BulkAndSearchExample { public static void main(String[] args) throws IOException { // 创建一个CredentialsProvider final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("username", "xxxxxxxxxx")); // 替换为你的用户名和密码 // 创建一个RestHighLevelClient实例 RestHighLevelClient client = new RestHighLevelClient( // space-12345678.ap-guangzhou.qcloudes.com为空间地址 RestClient.builder( new HttpHost("space-12345678.ap-guangzhou.qcloudes.com", 80, "http")) .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setDefaultCredentialsProvider(credentialsProvider))); try { // 创建一个BulkRequest BulkRequest request = new BulkRequest(); // 创建一个文档 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "kimchy"); jsonMap.put("@timestamp", new Date()); jsonMap.put("message", "trying out Elasticsearch serverless"); // 创建一个IndexRequest,并将其添加到BulkRequest中 IndexRequest indexRequest = new IndexRequest("my_index") .source(jsonMap); request.add(indexRequest); // 执行BulkRequest BulkResponse response = client.bulk(request, RequestOptions.DEFAULT); if (response.hasFailures()) { System.out.println("Bulk operations failed: " + response.buildFailureMessage()); } else { System.out.println("Bulk operations succeeded."); } // 创建一个SearchRequest SearchRequest searchRequest = new SearchRequest("my_index"); // 索引名称 // 创建一个SearchSourceBuilder SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); // 添加匹配所有文档的查询 searchSourceBuilder.query(QueryBuilders.matchAllQuery()); // 将SearchSourceBuilder应用到SearchRequest searchRequest.source(searchSourceBuilder); // 执行搜索 SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); // 打印搜索结果 for (SearchHit hit : searchResponse.getHits()) { System.out.println("Found document with ID: " + hit.getId()); // 如果需要,还可以打印文档的源数据 System.out.println("Source: " + hit.getSourceAsString()); } } finally { // 关闭client client.close(); } } }