通过客户端访问集群

最近更新时间:2019-06-21 12:23:18

腾讯云 ES 提供了丰富的 Restful 风格的 API,其官方和社区也推出了各个语言版本的 SDK 方便用户使用。
ES 在用户的 VPC 下提供了一个访问 ES 集群的 VIP,通过负载均衡的方式挂载了 ES 集群内的所有节点。这样做的目的,一方面是为了适配集群的弹性伸缩并保证其高可用性,当 ES 集群的节点发生变化时,VIP 会自动更新节点信息;另一方面,简化用户的操作,用户不用再关注集群节点 IP、port 等信息的变更。

说明:

不推荐通过 TransportClient 直接连接 Client 和 ES 节点。

ES 的 SDK 支持在创建连接时只配置一个节点的地址,并通过“节点嗅探”的功能嗅探出所有的节点来连接使用。这种使用方式有悖于我们推出 VIP 功能的初衷,增加了使用 ES 集群的复杂度。下面我们会简单介绍各个编程语言 SDK 包的使用方法,并说明如何关闭“节点嗅探”功能。

Java 客户端

ES 官方推荐使用 Java REST 客户端连接集群并进行数据操作。Java REST Client 有 Low Level 和 High Level 两种:

  • Java Low Level REST Client:使用该客户端需要将 HTTP 请求的 body 手动拼成 JSON 格式,HTTP 响应也必须将返回的 JSON 数据手动封装成对象;
  • Java High Level REST Client:该客户端基于 Low Level 客户端实现,提供 API 解决 Low Level 客户端需要手动转换数据格式的问题。

使用 Java High Level REST Client 访问集群,示例步骤与代码如下:

添加 Maven 依赖

说明:

建议 Java REST 客户端 API 版本,同 ES 集群服务端版本保持一致,腾讯云目前提供了不同版本的 ES,请注意选择相应版本的客户端。更多客户端 API 信息,可参考官方 ES API 版本兼容性说明

<dependency>
       <groupId>org.elasticsearch.client</groupId>
       <artifactId>elasticsearch-rest-high-level-client</artifactId>
       <version>6.4.3</version>
</dependency>

注意:

  • Client 版本需要与 ES 集群版本保持一致,否则可能会出现兼容性问题。此处的 Demo 适用于腾讯云 ES 6.4.3版本,其他版本的使用方法请参考 文档
  • Java High Level Client 构建于 Java Low Level Client 之上,两种 Client 都是基于 HTTP 协议连接 ES 集群,Java High Level Client 可提供的 API 种类随着版本升级会越来越多,如果当前版本的 Java High Level Client 提供的 API 不满足需求,可以通过升级 ES 集群版本和 Client 版本解决。
  • 使用 TCP 协议连接 ES 集群的 Transport Client 官方已经不再维护,建议使用使用 HTTP 协议连接集群的 Java High Level Client 或者 Java Low Level Client 。详请参考 elastic 官方文档:由 TransportClient 迁移至 Java High Level Client

示例代码

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;


public class test_es_sdk {
    private static Logger logger = Logger.getLogger(test_es_sdk.class);

    public  static void main(String[]args){
        BasicConfigurator.configure();
        // 设置验证信息,填写账号及密码
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials("user", "passwd"));
        // 初始化 RestClient, hostName 和 port 填写集群的内网 VIP 地址与端口
        RestClientBuilder builder = RestClient.builder(new HttpHost("xx.xx.xx.xx", 9200, "http"));
        // 设置认证信息
        builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });
        // 设置超时时间
        builder.setMaxRetryTimeoutMillis(10000);
        // 由Low Level Client构造High Level Client
        RestHighLevelClient client = new RestHighLevelClient(builder);

        // 索引文档
        Map<String, Object> jsonMap = new HashMap<String, Object>();
        jsonMap.put("user", "bellen");
        jsonMap.put("name", new Date());
        jsonMap.put("message", "trying out Elasticsearch");
        IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
                .source(jsonMap);

        try {
            // 获取响应结果
            IndexResponse indexResponse = client.index(indexRequest);
            String index = indexResponse.getIndex();
            String type = indexResponse.getType();
            String id = indexResponse.getId();
            long version = indexResponse.getVersion();

            if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
                logger.info("doc indexed, index: "+ index +", type:"+ type +",id:"+ id+",version:"+version);
            } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
                logger.info("doc updated, index: "+ index +", type:"+ type +",id:"+ id+",version:"+version);
            }
        }catch(ElasticsearchException e) {
            if (e.status() == RestStatus.CONFLICT) {
                logger.error("version conflict");

            }
        }catch(Exception e){
            logger.error("execute index api failed, "+ e.toString());
        }


        // 查询文档
        GetRequest getRequest = new GetRequest(
                "posts",
                "doc",
                "1");
        try {
            // 获取响应结果
            GetResponse getResponse = client.get(getRequest);
            String index = getResponse.getIndex();
            String type = getResponse.getType();
            String id = getResponse.getId();
            if (getResponse.isExists()) {
                long version = getResponse.getVersion();
                String sourceAsString = getResponse.getSourceAsString();
                logger.info("get doc, index: "+ index +", type:"+ type +",id:"+ id+",version:"+version +", source:"+ sourceAsString);
            }
        }catch (ElasticsearchException e) {
            if (e.status() == RestStatus.NOT_FOUND) {
                logger.warn("doc not found");
            }
        }
        catch(Exception e){
            logger.error("execute get api failed, "+ e.toString());
        }

        // 关闭客户端
        try {
            client.close();
        }catch (Exception e){
            logger.error("close rest client exception:"+ e.toString());
        }
    }
}

Python 客户端

pip 安装

pip install elasticsearch

示例代码

Elasticsearch函数的参数中设置如下3个参数关闭节点嗅探:

  • sniff_on_start=False
  • sniff_on_connection_fail=False
  • sniffer_timeout=None
from elasticsearch import Elasticsearch

es = Elasticsearch(["http://xx.xx.xx.xx:9200"],
      http_auth=('user', 'passwd'),
          sniff_on_start=False,
          sniff_on_connection_fail=False,
          sniffer_timeout=None)

res = es.index(index="my_index", doc_type="my_type", id=1, body={"title": "One", "tags": ["ruby"]})
print(res)
res = es.get(index="my_index", doc_type="my_type", id=1)
print(res['_source'])

PHP 客户端

为了避免节点的嗅探,您可以设置显示的连接池类为StaticConnectionPool,注意 不要使用节点嗅探连接池

示例代码

$client = ClientBuilder::create()
    ->setConnectionPool('\Elasticsearch\ConnectionPool\StaticConnectionPool', ["http://user:passwd@xx.xx.xx.xx:9200"])
    ->build();

Go 客户端

gopkg.in/olivere/elastic是在 Go 语言中广泛使用的一个由社区贡献的 SDK。此处的 Demo 适用于腾讯云 ES 6.4.3版本,其他版本的使用方法请参考 文档

安装 elastic

go get github.com/olivere/elastic

示例代码

NewClient函数的参数中设置elastic.SetSniff(false)关闭节点嗅探,设置elastic.SetHealthcheck(false)关闭对节点的健康检查。

import (
    "context"
    "fmt"

    "github.com/olivere/elastic"
)

func main() {
    client, err := elastic.NewClient(elastic.SetURL("http://user:passwd@xx.xx.xx.xx:9200"),
        elastic.SetSniff(false),elastic.SetHealthcheck(false))
    if err != nil {
        panic(err)
    }
    exists, err := client.IndexExists("twitter").Do(context.Background())
    if err != nil {
        panic(err)
    }
    fmt.Println(exists)
}