前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >springboot集成elasticsearch7实现全文检索及分页

springboot集成elasticsearch7实现全文检索及分页

作者头像
一缕82年的清风
发布2021-12-06 09:59:39
1.9K0
发布2021-12-06 09:59:39
举报
文章被收录于专栏:lsqingfeng

springboot集成elasticsearch7实现全文检索及分页

elasticsearch系列文章前面已经更新过三篇(https://blog.csdn.net/lsqingfeng/category_10219329.html)(建议先看下这三篇文章),分别讲解了elasticsearch7.2的安装,和springboot的集成以及简单的使用。前面都是通过demo的方式,主要讲解了他的一下简单使用,最近已经将es成功应用到了项目当中,本篇文章主要系统的介绍一下到底在项目中该如何使用es.

一. 概述

为什么选择elasticsearch? 我想能看到这篇文章的人应该都了解过了。 因为当你已经开始想知道springboot如何集成es的时候,说明你已经过了了解elasticsearch的阶段。 简单的说,就是在数据量很大的情况下,elasticsearch通过内部的“倒叙索引”使其查询性能更佳。接下来就是要面对怎么查的问题。一般情况下,我们都会使用数据库作为项目中的数据存储单元,得益于数据库对于事务的超强控制能力。而现在我们要引入es, es本身其实也是一个数据存储单元,只不过由于他的内部数据结构和数据库的结构不一致,使其具备了查询高效的能力。我们在学习的时候完全可以把它当做一个查询速度很快的数据库。

​ 那么我们要想从es中查询数据,es中必须得有数据,而往往我们的数据都是存储在数据库中, 所以查询的第一个就是将数据中的数据同步到es中(也有一些应用单独的使用es存储数据)。关于数据库和es的同步方案,网上有很多。最简单的一种,你往数据库存一条,同时就往es里存一条。你往数据库修改一条,同时也修改一下es,这种方式的有点就是操作简单,只需要在调用mysql的dao的同事,调用一下es的增删改查即可。缺点是要处理二者的同步性问题,比如往mysql插入成功,往es插入失败的情况的处理。第二种方式,我们可以使用定时任务,定时任务每隔一段时间从mysql中把数据全量读出来,然后往es中同步一次,这种方式的优点时对于原来的业务代码没有任何侵入,缺点也很明显,就是定时任务的通用缺点,实时性差,并且每次全量同步,可能导致查询压力大。还有一种方式是通过数据库的binlog, binlog大家应该比较了解,做数据库主从的同步都是通过binlog实现的。而mysql和es之间也可以通过binlog进行同步,这样不需要对代码做任何的修改,有兴趣的可以研究一下阿里的canal,就是专门做binlog同步的。

在我的项目中,经过一顿思考,决定使用两种方式,实时同步(增量同步)和定时任务(全量同步)的方式。当有对应数据发生增改删的操作,实时向es中同步。同时使用定时任务,每隔一段时间全量同步一次。

二. 准备工作
  1. 准备好自己的springboot项目,这里不会详细介绍springboot相关内容
  2. 在服务器上或者自己的虚拟机上安装好elasticsearch7.2 (全部代码均采用这个版本)
  3. 在服务器或者自己的虚拟机上安装好elasticsearch-head插件,也是7.2
  4. 在服务器或者自己的虚拟机上安装好kibana7.2

关于2.3.4这三项内容的安装请参考文章:

https://cloud.tencent.com/developer/article/1911266

如果下载软件慢的,可以使用国内镜像加速下载:https://www.newbe.pro/tags/Mirrors/

一定要确保上述服务安装成功:有head的页面,和kibana的页面。

三. 集成

集成的方式主要有两种,一种是使用es提供的原生客户端,一种是使用springboot-data提供的客户端spring-data-elasticsearch, 像一般我们使用redis,一般都会选择使用spring-data-redis, spring已经封装好了的一些工具方法,使用起来很方便。但是这里我要介绍的是原生客户端的方案。主要原因就是spring-data支持的es版本太低,虽然近期spring-data-es已经更新可以支持比较新的版本的es,但是同时对于springboot的版本要求也比较高。我项目中所使用spring-boot版本较低,而对应支持的es版本更低,所以我直接集成es的原生客户端。具体原因可以参看:https://blog.csdn.net/lsqingfeng/article/details/106398077。也可自行查看spring-data-es官网了解es和springboot版本对应关系做选择。

关于集成,这里使用的es中提供的 HighLevelRestClient,高级别客户端,这也是官方推荐的,另外es7以上,已经不推荐使用TransportClient了,es7也取消了type的概念。

集成方式,引入jar包,添加配置即可

  1. pom.xml
代码语言:javascript
复制
 <dependency>
   <groupId>org.elasticsearch.client</groupId>
   <artifactId>elasticsearch-rest-high-level-client</artifactId>
   <version>${elasticsearch.version}</version>
 </dependency>
 <dependency>
   <groupId>org.elasticsearch.client</groupId>
   <artifactId>elasticsearch-rest-client</artifactId>
   <version>${elasticsearch.version}</version>
 </dependency>
 <dependency>
   <groupId>org.elasticsearch</groupId>
   <artifactId>elasticsearch</artifactId>
   <version>${elasticsearch.version}</version>
 </dependency>

这里 的版本号定义的是7.2.0

代码语言:javascript
复制
  <elasticsearch.version>7.2.0</elasticsearch.version>

截止到目前,就已经集成完毕,接下来就可以在代码中使用客户端操作es了。

四. 自定义注解创建索引

关于这块的内容参见:https://cloud.tencent.com/developer/article/1911164

这里简单说一下,索引就相当于是表结构,es本身也是存储数据的,既然是存储,就需要定一个结构,比如有哪些字段,每个字段是什么类型。但是痛点是如果我们定义的这个结构如果比较复杂,那么用原生的方法代码会很多,很麻烦,所以我们可以自己定义一套注解,加入到实体类上,这样就可以根据实体类,让es自己去创建索引,很方便。就类似于以前hibernate,可以根据我们写的实体类自动生成表。

关于注解,这里也给出以下,在之前文章基础上做了些改动,主要就是加入了EsId注解,可以将制定字段作为es的id,如果不加这个,es默认id是自动生成的,有了这个,那么我们可以让mysql的id直接作为es的id,方便更新。

代码语言:javascript
复制
package xx.xxx.xxx.es.annotation;

import java.lang.annotation.*;

/**
 * Es 文档注解,用于做索引实体映射
 * 作用在类上
 * @author ls
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Inherited
public @interface Document {

    /**
     * index : 索引名称
     * @return
     */
    String index();

    /**
     * 类型名称
     * @return
     */
    String type();

}
代码语言:javascript
复制
package xx.xxx.xxx.es.annotation;

import java.lang.annotation.*;

/**
 * 用于标识使用 该字段作为ES数据中的id
 * @author sh.Liu
 * @create: 2020-07-22
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface EsId {
}
代码语言:javascript
复制
package xx.xxx.xxx.es.annotation;


import xx.xxx.xxx.es.enums.AnalyzerType;
import xx.xxx.xxx.es.enums.FieldType;

import java.lang.annotation.*;

/**
 * 作用在字段上,用于定义类型,映射关系
 * @author ls
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface Field  {

    FieldType type() default FieldType.TEXT;

    /**
     * 指定分词器
     * @return
     */
    AnalyzerType analyzer() default AnalyzerType.STANDARD;
}

两个枚举:

代码语言:javascript
复制
package xx.xxx.xxx.es.enums;

import lombok.Getter;

/**
 * @className: AnalyzerType
 * @description:
 * @author: sh.Liu
 * @create: 2020-05-27 11:37
 */
@Getter
public enum AnalyzerType {

    NO("不使用分词"),
    /**
     * 标准分词,默认分词器
     */
    STANDARD("standard"),

    /**
     * ik_smart:会做最粗粒度的拆分;已被分出的词语将不会再次被其它词语占有
     */
    IK_SMART("ik_smart"),

    /**
     * ik_max_word :会将文本做最细粒度的拆分;尽可能多的拆分出词语
     */
    IK_MAX_WORD("ik_max_word")

    ;

    private String type;

    AnalyzerType(String type){
        this.type = type;
    }
}
代码语言:javascript
复制
package xx.xxx.xxx.es.enums;

import lombok.Getter;

/**
 * es 类型参看
 * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
 */
@Getter
public enum FieldType {
    /**
     * text
     */
    TEXT("text"),

    KEYWORD("keyword"),

    INTEGER("integer"),

    DOUBLE("double"),

    DATE("date"),

    /**
     * 单条数据
     */
    OBJECT("object"),

    /**
     * 嵌套数组
     */
    NESTED("nested"),


    ;


    FieldType(String type){
        this.type = type;
    }

    private String type;


}

这里直接给出我封装的es工具类,如果有不成熟的地方,也请大家多多指教。

代码语言:javascript
复制
package com.cestc.common.es;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import xx.xxx.xxx.es.annotation.Document;
import xx.xxx.xxx.es.annotation.EsId;
import xx.xxx.xxx.es.enums.FieldType;
import com.github.pagehelper.PageInfo;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;

/**
 * @className: EsUtil
 * @description: es 操作工具类;
 *      这里均采用同步调用的方式
 * @author: sh.Liu
 * @create: 2020-05-25 09:41
 */
@Component
@Slf4j
public class ElasticsearchUtil {

    @Resource
    private RestHighLevelClient restHighLevelClient;

    /**
     * 创建索引(默认分片数为5和副本数为1)
     * @param clazz 根据实体自动映射es索引
     * @throws IOException
     */
    public boolean createIndex(Class clazz) throws Exception {
        Document declaredAnnotation = (Document)clazz.getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", clazz.getName()));
        }
        String indexName = declaredAnnotation.index();
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        request.settings(Settings.builder()
                // 设置分片数为3, 副本为2
                .put("index.number_of_shards", 3)
                .put("index.number_of_replicas", 2)
        );
        request.mapping(generateBuilder(clazz));
        CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
        // 指示是否所有节点都已确认请求
        boolean acknowledged = response.isAcknowledged();
        // 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
        boolean shardsAcknowledged = response.isShardsAcknowledged();
        if (acknowledged || shardsAcknowledged) {
            log.info("创建索引成功!索引名称为{}", indexName);
            return true;
        }
        return false;
    }

    /**
     * 创建索引(默认分片数为5和副本数为1)
     * @param clazz 根据实体自动映射es索引
     * @throws IOException
     */
    public boolean createIndexIfNotExist(Class clazz) throws Exception {
        Document declaredAnnotation = (Document)clazz.getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", clazz.getName()));
        }
        String indexName = declaredAnnotation.index();

        boolean indexExists = isIndexExists(indexName);
        if (!indexExists) {
            CreateIndexRequest request = new CreateIndexRequest(indexName);
            request.settings(Settings.builder()
                    // 设置分片数为3, 副本为2
                    .put("index.number_of_shards", 3)
                    .put("index.number_of_replicas", 2)
            );
            request.mapping(generateBuilder(clazz));
            CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            // 指示是否所有节点都已确认请求
            boolean acknowledged = response.isAcknowledged();
            // 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
            boolean shardsAcknowledged = response.isShardsAcknowledged();
            if (acknowledged || shardsAcknowledged) {
                log.info("创建索引成功!索引名称为{}", indexName);
                return true;
            }
        }
        return false;
    }

    /**
     * 更新索引(默认分片数为5和副本数为1):
     * 只能给索引上添加一些不存在的字段
     * 已经存在的映射不能改
     *
     * @param clazz 根据实体自动映射es索引
     * @throws IOException
     */
    public boolean updateIndex(Class clazz) throws Exception {
        Document declaredAnnotation = (Document )clazz.getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", clazz.getName()));
        }
        String indexName = declaredAnnotation.index();
        PutMappingRequest request = new PutMappingRequest(indexName);

        request.source(generateBuilder(clazz));
        AcknowledgedResponse response = restHighLevelClient.indices().putMapping(request, RequestOptions.DEFAULT);
        // 指示是否所有节点都已确认请求
        boolean acknowledged = response.isAcknowledged();

        if (acknowledged ) {
            log.info("更新索引索引成功!索引名称为{}", indexName);
            return true;
        }
        return false;
    }
    /**
     * 删除索引
     * @param indexName
     * @return
     */
    public boolean delIndex(String indexName){
        boolean acknowledged = false;
        try {
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
            deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
            AcknowledgedResponse delete = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
            acknowledged = delete.isAcknowledged();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return acknowledged;
    }

    /**
     * 判断索引是否存在
     * @param indexName
     * @return
     */
    public boolean isIndexExists(String indexName){
        boolean exists = false;
        try {
            GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
            getIndexRequest.humanReadable(true);
            exists = restHighLevelClient.indices().exists(getIndexRequest,RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return exists;
    }


    /**
     * 添加单条数据
     * 提供多种方式:
     *  1. json
     *  2. map
     *      Map<String, Object> jsonMap = new HashMap<>();
     *      jsonMap.put("user", "kimchy");
     *      jsonMap.put("postDate", new Date());
     *      jsonMap.put("message", "trying out Elasticsearch");
     *      IndexRequest indexRequest = new IndexRequest("posts")
     *          .id("1").source(jsonMap);
     *  3. builder
     *      XContentBuilder builder = XContentFactory.jsonBuilder();
     *      builder.startObject();
     *      {
     *          builder.field("user", "kimchy");
     *          builder.timeField("postDate", new Date());
     *          builder.field("message", "trying out Elasticsearch");
     *      }
     *      builder.endObject();
     *      IndexRequest indexRequest = new IndexRequest("posts")
     *      .id("1").source(builder);
     * 4. source:
     *      IndexRequest indexRequest = new IndexRequest("posts")
     *     .id("1")
     *     .source("user", "kimchy",
     *         "postDate", new Date(),
     *         "message", "trying out Elasticsearch");
     *
     *   报错:  Validation Failed: 1: type is missing;
     *      加入两个jar包解决
     *
     *   提供新增或修改的功能
     *
     * @return
     */
    public IndexResponse index(Object o) throws Exception {
        Document declaredAnnotation = (Document )o.getClass().getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", o.getClass().getName()));
        }
        String indexName = declaredAnnotation.index();

        IndexRequest request = new IndexRequest(indexName);
        Field fieldByAnnotation = getFieldByAnnotation(o, EsId.class);
        if (fieldByAnnotation != null) {
            fieldByAnnotation.setAccessible(true);
            try {
                Object id = fieldByAnnotation.get(o);
                request =request.id(id.toString());
            } catch (IllegalAccessException e) {
                log.error("获取id字段出错:{}", e);
            }
        }

        String userJson = JSON.toJSONString(o);
        request.source(userJson, XContentType.JSON);
        IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
        return indexResponse;
    }


    /**
     * 根据id查询
     * @return
     */
    public String queryById(String indexName, String id) throws IOException {
        GetRequest getRequest = new GetRequest(indexName, id);
        // getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);

        GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
        String jsonStr = getResponse.getSourceAsString();
        return jsonStr;
    }

    /**
     * 查询封装返回json字符串
     * @param indexName
     * @param searchSourceBuilder
     * @return
     * @throws IOException
     */
    public String search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchRequest.source(searchSourceBuilder);
        searchRequest.scroll(TimeValue.timeValueMinutes(1L));
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        String scrollId = searchResponse.getScrollId();
        SearchHits hits = searchResponse.getHits();
        JSONArray jsonArray = new JSONArray();
        for (SearchHit hit : hits) {
            String sourceAsString = hit.getSourceAsString();
            JSONObject jsonObject = JSON.parseObject(sourceAsString);
            jsonArray.add(jsonObject);
        }
        log.info("返回总数为:" + hits.getTotalHits());
        return jsonArray.toJSONString();
    }

    /**
     * 查询封装,带分页
     * @param searchSourceBuilder
     * @param pageNum
     * @param pageSize
     * @param s
     * @param <T>
     * @return
     * @throws IOException
     */
    public <T> PageInfo<T> search(SearchSourceBuilder searchSourceBuilder, int pageNum, int pageSize, Class<T> s) throws Exception {
        Document declaredAnnotation = (Document )s.getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", s.getName()));
        }
        String indexName = declaredAnnotation.index();
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchRequest.source(searchSourceBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        SearchHits hits = searchResponse.getHits();
        JSONArray jsonArray = new JSONArray();
        for (SearchHit hit : hits) {
            String sourceAsString = hit.getSourceAsString();
            JSONObject jsonObject = JSON.parseObject(sourceAsString);
            jsonArray.add(jsonObject);
        }
        log.info("返回总数为:" + hits.getTotalHits());
        int total = (int)hits.getTotalHits().value;

        // 封装分页
        List<T> list = jsonArray.toJavaList(s);
        PageInfo<T> page = new PageInfo<>();
        page.setList(list);
        page.setPageNum(pageNum);
        page.setPageSize(pageSize);
        page.setTotal(total);
        page.setPages(total== 0 ? 0: (total%pageSize == 0 ? total / pageSize : (total / pageSize) + 1));
        page.setHasNextPage(page.getPageNum() < page.getPages());
        return page;
    }

    /**
     * 查询封装,返回集合
     * @param searchSourceBuilder
     * @param s
     * @param <T>
     * @return
     * @throws IOException
     */
    public <T> List<T> search(SearchSourceBuilder searchSourceBuilder, Class<T> s) throws Exception {
        Document declaredAnnotation = (Document )s.getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [Document], please check", s.getName()));
        }
        String indexName = declaredAnnotation.index();
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchRequest.source(searchSourceBuilder);
        searchRequest.scroll(TimeValue.timeValueMinutes(1L));
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        String scrollId = searchResponse.getScrollId();
        SearchHits hits = searchResponse.getHits();
        JSONArray jsonArray = new JSONArray();
        for (SearchHit hit : hits) {
            String sourceAsString = hit.getSourceAsString();
            JSONObject jsonObject = JSON.parseObject(sourceAsString);
            jsonArray.add(jsonObject);
        }
        // 封装分页
        List<T> list = jsonArray.toJavaList(s);
        return list;
    }


    /**
     * 批量插入文档
     * 文档存在 则插入
     * 文档不存在 则更新
     * @param list
     * @return
     */
    public <T> boolean batchSaveOrUpdate(List<T> list) throws Exception {
        Object o1 = list.get(0);
        Document declaredAnnotation = (Document )o1.getClass().getDeclaredAnnotation(Document.class);
        if(declaredAnnotation == null){
            throw new Exception(String.format("class name: %s can not find Annotation [@Document], please check", o1.getClass().getName()));
        }
        String indexName = declaredAnnotation.index();

        BulkRequest request = new BulkRequest(indexName);
        for (Object o : list) {
            String jsonStr = JSON.toJSONString(o);
            IndexRequest indexReq = new IndexRequest().source(jsonStr, XContentType.JSON);

            Field fieldByAnnotation = getFieldByAnnotation(o, EsId.class);
            if (fieldByAnnotation != null) {
                fieldByAnnotation.setAccessible(true);
                try {
                    Object id = fieldByAnnotation.get(o);
                    indexReq = indexReq.id(id.toString());
                } catch (IllegalAccessException e) {
                    log.error("获取id字段出错:{}", e);
                }
            }
            request.add(indexReq);
        }
        BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);

        for(BulkItemResponse bulkItemResponse : bulkResponse){
            DocWriteResponse itemResponse = bulkItemResponse.getResponse();
            IndexResponse indexResponse = (IndexResponse) itemResponse;
            log.info("单条返回结果:{}", indexResponse);
            if(bulkItemResponse.isFailed()){
                log.error("es 返回错误{}",bulkItemResponse.getFailureMessage());
                return false;
            }
        }
        return true;
    }

    /**
     * 删除文档
     * @param indexName: 索引名称
     * @param docId:     文档id
     */
    public boolean deleteDoc(String indexName, String docId) throws IOException {
        DeleteRequest request = new DeleteRequest(indexName, docId);
        DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
        // 解析response
        String index = deleteResponse.getIndex();
        String id = deleteResponse.getId();
        long version = deleteResponse.getVersion();
        ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
        if (shardInfo.getFailed() > 0) {
            for (ReplicationResponse.ShardInfo.Failure failure :
                    shardInfo.getFailures()) {
                String reason = failure.reason();
                log.info("删除失败,原因为 {}", reason);
            }
        }
        return true;
    }

    /**
     * 根据json类型更新文档
     * @param indexName
     * @param docId
     * @param o
     * @return
     * @throws IOException
     */
    public boolean updateDoc(String indexName, String docId, Object o) throws IOException {
        UpdateRequest request = new UpdateRequest(indexName, docId);
        request.doc(JSON.toJSONString(o), XContentType.JSON);
        UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
        String index = updateResponse.getIndex();
        String id = updateResponse.getId();
        long version = updateResponse.getVersion();
        if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
            return true;
        } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            return true;
        } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
        } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {

        }
        return false;
    }

    /**
     * 根据Map类型更新文档
     * @param indexName
     * @param docId
     * @param map
     * @return
     * @throws IOException
     */
    public boolean updateDoc(String indexName, String docId, Map<String, Object> map) throws IOException {
        UpdateRequest request = new UpdateRequest(indexName, docId);
        request.doc(map);
        UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
        String index = updateResponse.getIndex();
        String id = updateResponse.getId();
        long version = updateResponse.getVersion();
        if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
            return true;
        } else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
            return true;
        } else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
        } else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {

        }
        return false;
    }


    public XContentBuilder generateBuilder(Class clazz) throws IOException {
        // 获取索引名称及类型
        Document doc = (Document) clazz.getAnnotation(Document.class);
        System.out.println(doc.index());
        System.out.println(doc.type());

        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        builder.startObject("properties");
        Field[] declaredFields = clazz.getDeclaredFields();
        for (Field f : declaredFields) {
            if (f.isAnnotationPresent(com.cestc.common.es.annotation.Field.class)) {
                // 获取注解
                com.cestc.common.es.annotation.Field  declaredAnnotation = f.getDeclaredAnnotation(com.cestc.common.es.annotation.Field.class);

                // 如果嵌套对象:
                /**
                 * {
                 *   "mappings": {
                 *     "properties": {
                 *       "region": {
                 *         "type": "keyword"
                 *       },
                 *       "manager": {
                 *         "properties": {
                 *           "age":  { "type": "integer" },
                 *           "name": {
                 *             "properties": {
                 *               "first": { "type": "text" },
                 *               "last":  { "type": "text" }
                 *             }
                 *           }
                 *         }
                 *       }
                 *     }
                 *   }
                 * }
                 */
                if (declaredAnnotation.type() == FieldType.OBJECT) {
                    // 获取当前类的对象-- Action
                    Class<?> type = f.getType();
                    Field[] df2 = type.getDeclaredFields();
                    builder.startObject(f.getName());
                    builder.startObject("properties");
                    // 遍历该对象中的所有属性
                    for (Field f2 : df2) {
                        if (f2.isAnnotationPresent(com.cestc.common.es.annotation.Field.class)) {
                            // 获取注解
                            com.cestc.common.es.annotation.Field declaredAnnotation2 = f2.getDeclaredAnnotation(com.cestc.common.es.annotation.Field.class);
                            builder.startObject(f2.getName());
                            builder.field("type", declaredAnnotation2.type().getType());
                            // keyword不需要分词
                            if (declaredAnnotation2.type() == FieldType.TEXT) {
                                builder.field("analyzer", declaredAnnotation2.analyzer().getType());
                            }
                            if (declaredAnnotation2.type() == FieldType.DATE) {
                                builder.field("format", "yyyy-MM-dd HH:mm:ss");
                            }
                            builder.endObject();
                        }
                    }
                    builder.endObject();
                    builder.endObject();

                }else{
                    builder.startObject(f.getName());
                    builder.field("type", declaredAnnotation.type().getType());
                    // keyword不需要分词
                    if (declaredAnnotation.type() == FieldType.TEXT) {
                        builder.field("analyzer", declaredAnnotation.analyzer().getType());
                    }
                    if (declaredAnnotation.type() == FieldType.DATE) {
                        builder.field("format", "yyyy-MM-dd HH:mm:ss");
                    }
                    builder.endObject();
                }
            }
        }
        // 对应property
        builder.endObject();
        builder.endObject();
        return builder;
    }


   public static Field getFieldByAnnotation(Object o ,Class annotationClass){
       Field[] declaredFields = o.getClass().getDeclaredFields();
       if (declaredFields != null && declaredFields.length >0) {
           for(Field f : declaredFields){
                if (f.isAnnotationPresent(annotationClass)) {
                    return f;
                }
           }
       }
       return null;
   }

}

这里会使用反射通过上面标注的注解来使用:比如我有一个实体,根据这个实体自动创建索引的方式如下; 首先在实体上加入我们自定义的注解,来设置索引名称,字段的类型,分词器是什么。这里字符串类型在es中有两种,一是KEY_WORD,不分词, 二是TEXT:会分词

代码语言:javascript
复制
package xx.xxx.xxx.es.pojo;

import xx.xxx.xxx.es.annotation.Document;
import xx.xxx.xxx.es.annotation.EsId;
import xx.xxx.xxx.es.annotation.Field;
import xx.xxx.xxx.es.enums.AnalyzerType;
import xx.xxx.xxx.es.enums.FieldType;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;

import java.math.BigDecimal;

/**
 * @className: EsEvent
 * @description:
 * @author: sh.Liu
 * @date: 2020-07-22 10:51
 */
@Data
@Document(index = "es_event", type = "") 
public class EsEvent {
    private static final long serialVersionUID=1L;

    @EsId
    @Field(type = FieldType.KEYWORD)
    private Integer eventId;

    /**
     * 唯一标识码
     */
    @Field(type = FieldType.KEYWORD)
    private String uniqueCode;

    /**
     * 任务号
     */
    @Field(type = FieldType.KEYWORD)
    private String eventCode;

    /**
     * 事件来源编号
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventSrcId;

    /**
     * 事件来源名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String eventSrcName;

    /**
     * 来源分组
     */
    @Field(type = FieldType.KEYWORD)
    private String srcGroupCode;

    /**
     * 事件大类编码
     */
    @Field(type = FieldType.KEYWORD)
    private String eventTypeCode;

    /**
     * 事件大类名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String eventTypeName;

    /**
     * 事件小类编码
     */
    @Field(type = FieldType.KEYWORD)
    private String eventSubtypeCode;

    /**
     * 事件小类名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String eventSubtypeName;

    /**
     * 重要程度
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventGradeId;

    /**
     * 重要程度名称
     */
    @Field(type = FieldType.KEYWORD)
    private String eventGradeName;

    /**
     *紧急程度标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventUrgencyId;

    /**
     *紧急程度名称
     */
    @Field(type = FieldType.KEYWORD)
    private String eventUrgencyName;

    /**
     *事件级别标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventLevelId;

    /**
     *事件级别名称
     */
    @Field(type = FieldType.KEYWORD)
    private String eventLevelName;

    /**
     *事件升级标志
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventUpgradeFlag;

    /**
     *处置级别标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer dealLevelId;

    /**
     *处置级别标识
     */
    @Field(type = FieldType.KEYWORD)
    private String dealLevelName;

    /**
     *公众上报人名称
     */
    @Field(type = FieldType.TEXT , analyzer = AnalyzerType.IK_SMART)
    private String publicReporterName;

    /**
     *公众上报人身份证号
     */
    @Field(type = FieldType.KEYWORD)
    private String publicReporterIdcard;

    /**
     *公众上报人联系方式
     */
    @Field(type = FieldType.KEYWORD)
    private String publicReporterTel;
    /**
     * 事件描述
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String eventDesc;
    /**
     * 地址
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String address;
    /**
     * 地区名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String areaRegionName;

    /**
     * 地区编码
     */
    @Field(type = FieldType.KEYWORD)
    private String areaRegionCode;

    /**
     * 社区名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String commRegionName;

    /**
     * 区编码
     */
    @Field(type = FieldType.KEYWORD)
    private String commRegionCode;

    /**
     * 街道名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String streetRegionName;

    /**
     * 街道编码
     */
    @Field(type = FieldType.KEYWORD)
    private String streetRegionCode;

    /**
     * 社区名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String regionName;

    /**
     * 社区编码
     */
    @Field(type = FieldType.KEYWORD)
    private String regionCode;

    /**
     * 经度
     */
    private BigDecimal coordX;

    /**
     * 纬度
     */
    private BigDecimal coordY;

    /**
     *坐标系
     */
    private String mapcoordinate;

    /**
     *网格员标识
     */
    private Integer griderId;

    /**
     *网格员标识
     */
    private String griderName;

    /**
     *网格员电话
     */
    private String griderPhone;

    /**
     *核实状态标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer verifyStateId;

    /**
     *核查状态标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer checkStateId;

    /**
     *事件建立时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String createTime;

    /**
     *流程结束时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String endTime;

    /**
     *延期天数
     */
    private Float postponedDays;

    /**
     *延期标志
     */
    private Integer postponedFlag;

    /**
     *受理时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String acceptTime;

    /**
     *立案时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String establishTime;

    /**
     *调度时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String dispatchTime;

    /**
     *流程开始时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String procStartTime;

    /**
     *流程结束时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String procEndTime;

    /**
     *流程截止时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String procDeadLine;

    /**
     *流程警告时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String procWarningTime;

    /**
     *处置开始时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String funcBeginTime;

    /**
     *处置完成时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String funcFinishTime;

    /**
     *自处置标识
     */
    private Integer gridDealFlag;

    /**
     *跨网格标志
     */
    private Integer overGridFlag;

    /**
     *是否督办
     */
    @Field(type = FieldType.INTEGER)
    private Integer pressFlag;

    /**
     *是否催办
     */
    @Field(type = FieldType.INTEGER)
    private Integer hurryFlag;

    /**
     *超期标志
     */
    @Field(type = FieldType.INTEGER)
    private Integer overtimeFlag;

    /**
     *活动属性
     */
    @Field(type = FieldType.INTEGER)
    private Integer actPropertyId;

    /**
     *活动属性名称
     */
    @Field(type = FieldType.KEYWORD)
    private String actPropertyName;

    /**
     *流程实例标识
     */
    @Field(type = FieldType.KEYWORD)
    private String procInstId;

    /**
     *流程定义标识
     */
    @Field(type = FieldType.KEYWORD)
    private String procDefId;

    /**
     *事件状态
     */
    @Field(type = FieldType.INTEGER)
    private Integer eventStateId;

    /**
     * 上一操作项
     */
    @Field(type = FieldType.KEYWORD)
    private String preActionName;

    /**
     * 登记人Id
     */
    @Field(type = FieldType.INTEGER)
    private Integer registerId;

    /**
     * 登记人姓名
     */
    @Field(analyzer = AnalyzerType.IK_SMART)
    private String registerName;

    /**
     * 回访标识:0-未回访 1-已回访 2-无法回访
     */
    @Field(type = FieldType.INTEGER)
    private Integer visitorStateId;

    /**
     * 删除标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer deleteFlag;

    /**
     * 删除用户标识
     */
    @Field(type = FieldType.INTEGER)
    private Integer deleteUserId;

    /**
     * 删除时间
     */
    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String deleteTime;

    /**
     * 是否下发督查
     * 0:否,1:是
     */
    @Field(type = FieldType.INTEGER)
    private Integer overseerFlag;

    @Field(type = FieldType.DATE)
    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String updateTime;

    @Field(type = FieldType.OBJECT)
    private EsAct act;

}

act结构:

代码语言:javascript
复制
package xx.xxx.xxx.es.pojo;

import xx.xxx.xxx.es.annotation.Field;
import xx.xxx.xxx.es.ednums.AnalyzerType;
import xx.xxx.xxx.es.enums.FieldType;
import lombok.Data;

import java.io.Serializable;
import java.math.BigDecimal;

/**
 * @className: EsAct
 * @description: es act表
 * @author: sh.Liu
 * @date: 2020-07-22 13:18
 */
@Data
public class EsAct implements Serializable {
    private static final long serialVersionUID = 1L;

    @Field(type = FieldType.INTEGER)
    private Integer actId;

    /**
     * 任务标识
     */
    @Field(type = FieldType.KEYWORD)
    private String taskId;

    /**
     * 流程定义标识
     */
    @Field(type = FieldType.KEYWORD)
    private String procDefId;

    /**
     * 流程实例标识
     */
    @Field(type = FieldType.KEYWORD)
    private String procInstId;

    /**
     * 子流程实例标识
     */
    @Field(type = FieldType.KEYWORD)
    private String subInstId;

    /**
     * 节点定义标识
     */
    @Field(type = FieldType.KEYWORD)
    private String nodeId;

    /**
     * 节点定义名称
     */
    @Field(type = FieldType.KEYWORD)
    private String nodeName;

    /**
     * 业务主键标识
     */
    @Field(type = FieldType.KEYWORD)
    private String bizId;

    /**
     * 参与者标识
     */
    @Field(type = FieldType.KEYWORD)
    private String partId;
    /**
     * 参与者姓名
     */
    @Field(type = FieldType.TEXT)
    private String partName;
    /**
     * 部门id
     */
    @Field(type = FieldType.KEYWORD)
    private String unitId;
    /**
     * 部门名称
     */
    @Field(type = FieldType.TEXT)
    private String unitName;

    /**
     * 角色标识
     */
    @Field(type = FieldType.KEYWORD)
    private String roleId;

    /**
     * 角色名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String roleName;

    /**
     * 上一活动标识
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String preActId;

    /**
     * 上一活动参与者标识
     */
    private String prePartId;

    /**
     * 上一活动参与者名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String prePartName;

    /**
     * 上一活动定义标识
     */
    private String preNodeId;

    /**
     * 上一活动定义名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String preNodeName;

    /**
     * 上一活动意见
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String preOpinion;

    /**
     * 上一活动操作项名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String preActionName;

    /**
     * 上一活动操作项显示名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String preActionLabel;

    /**
     * 创建时间
     */
    @Field(type = FieldType.DATE)
    private String createTime;

    /**
     * 截止时间
     */
    @Field(type = FieldType.DATE)
    private String deadLine;

    /**
     * 警告时间
     */
    @Field(type = FieldType.DATE)
    private String warningTime;

    /**
     * 结束时间
     */
    @Field(type = FieldType.DATE)
    private String endTime;

    /**
     * 活动红绿灯
     */
    @Field(type = FieldType.INTEGER)
    private Integer actTimeStateId;

    /**
     * 活动时限
     */
    @Field(type = FieldType.DOUBLE)
    private BigDecimal timeLimit;

    /**
     * 计时单位
     */
    @Field(type = FieldType.INTEGER)
    private Integer timeUnit;

    /**
     * 活动时限分钟
     */
    @Field(type = FieldType.INTEGER)
    private Integer timeLimitM;

    /**
     * 已用时
     */
    @Field(type = FieldType.INTEGER)
    private Integer actUsedTime;

    /**
     * 剩余时
     */
    @Field(type = FieldType.INTEGER)
    private Integer actRemainTime;

    /**
     * 活动时限信息
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actLimitInfo;

    /**
     * 活动已用时间字符串
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actUsedTimeChar;

    /**
     * 活动剩余时间字符串
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actRemainTimeChar;

    /**
     * 累计计时标志
     */
    @Field(type = FieldType.INTEGER)
    private Integer timeAddUpFlag;

    /**
     * 暂停前节点用时
     */
    @Field(type = FieldType.INTEGER)
    private Integer actUsedTimeBeforeStop;

    /**
     * 恢复计时时间
     */
    @Field(type = FieldType.DATE)
    private String actRestart;

    /**
     * 已读标志
     */
    @Field(type = FieldType.INTEGER)
    private Integer readFlag;

    /**
     * 已读时间
     */
    @Field(type = FieldType.DATE)
    private String readTime;

    /**
     * 批转意见
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String transOpinion;

    /**
     * 操作项名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actionName;

    /**
     * 操作项显示名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actionLabel;

    /**
     * 活动属性id
     */
    @Field(type = FieldType.INTEGER)
    private Integer actPropertyId;

    /**
     * 活动属性名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String actPropertyName;

    /**
     * 抄送参与者
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String ccPart;

    /**
     * 抄送参与者名称
     */
    @Field(type = FieldType.TEXT, analyzer = AnalyzerType.IK_SMART)
    private String ccPartName;

    /**
     * 抄送标志
     */
    @Field(type = FieldType.INTEGER)
    private Integer ccFlag;
}

创建索引方式:

代码语言:javascript
复制
@RequestMapping("createIndex")
public String createIndex() {
  try {
    esUtil.createIndexIfNotExist(EsEvent.class);

  } catch (Exception e) {
    e.printStackTrace();
  }

  return "success";
}

这样就会根据创建一个叫做 es_event的索引(实体注解上配置)同时EsEvent 和 EsAct是一对一的依赖方式。

通过head插件可以看到索引结构(部分):

五. 分页查询封装:

我的项目中需要使用es做一个分页查询,查询条件通过前端传过来,所以我们需要做一下动态拼接,同时还有排序功能。我们将查询条件和分页条件都封装到一个VO中。

代码语言:javascript
复制
/**
     * 综合查询列表
     *
     * @return
     */
    @PostMapping("/comprehensive/list")
    public Result<Result.ResultPage<ComprehensiveQueryEventVO>> listComprehensiveQuery(@RequestBody ComprehensiveQueryEventVO comprehensiveQueryEventVO) {
        PageInfo<ComprehensiveQueryEventVO> returnPageInfo = null;
        try {
            returnPageInfo = esService.pageComprehensiveQuery(comprehensiveQueryEventVO);
        } catch (Exception e) {
            log.error("es 综合查询异常,开始使用数据库做综合查询,错误为 :{}", e);
            // es异常,使用数据库查询
            PageHelper.startPage(comprehensiveQueryEventVO.getPageNum(), comprehensiveQueryEventVO.getPageSize());
            List<ComprehensiveQueryEventVO> comprehensiveQueryEventVOS = eventInfoService.listComprehensiveQuery(comprehensiveQueryEventVO);
            PageInfo<ComprehensiveQueryEventVO> queryPageInfo = new PageInfo<>(comprehensiveQueryEventVOS);
            returnPageInfo = new PageInfo<>();
            // 此处统一VO,此处使用可忽略
            Type type = new TypeToken<List<ComprehensiveQueryEventVO>>() {
            }.getType();
            ModelMapper modelMapper = new ModelMapper();
            modelMapper.map(queryPageInfo, returnPageInfo);
            returnPageInfo.setList(new ModelMapper().map(queryPageInfo.getList(), type));
        }
        Result.ResultPage<ComprehensiveQueryEventVO> resultPage = Result.ResultPage.build(returnPageInfo);
        Result<Result.ResultPage<ComprehensiveQueryEventVO>> result = Result.success(resultPage);
        return result;
    }
代码语言:javascript
复制
package com.cestc.cooperative.pojo.vo;


import com.alibaba.excel.annotation.ExcelProperty;
import com.alibaba.excel.annotation.write.style.ColumnWidth;
import lombok.Data;

import java.io.Serializable;
import java.util.List;

/**
 * 综合查询对应javabean
 *
 * @author wanggang
 * @date 2020/1/3
 */
@Data
@ColumnWidth(30)
public class ComprehensiveQueryEventVO implements Serializable {

    @ExcelProperty("序号")
    private Integer orderNum;

    /**
     * 任务号
     */
    @ExcelProperty("事件编号")
    private String eventCode;

    /**
     * 事件大类名称
     */
    @ExcelProperty("类型")
    private String eventTypeName;

    /**
     * 事件大类编码
     */
    @ExcelProperty("事件大类编码")
    private String eventTypeCode;

    /**
     * 事件小类编码
     */
    @ExcelProperty("事件小类编码")
    private String eventSubtypeCode;

    /**
     * 事件小类名称
     */
    @ExcelProperty("事件小类名称")
    private String eventSubtypeName;

    /**
     * 地区名称
     */
    @ExcelProperty("地区名称")
    private String areaRegionName;

    /**
     * 地区编码
     */
    private String areaRegionCode;

    /**
     * 社区名称
     */
    @ExcelProperty("社区名称")
    private String commRegionName;

    /**
     * 社区编码
     */
    private String commRegionCode;

    /**
     * 街道名称
     */
    @ExcelProperty("街道名称")
    private String streetRegionName;

    /**
     * 街道编码
     */
    private String streetRegionCode;

    /**
     * 社区名称
     */
    @ExcelProperty("区域")
    private String regionName;

    /**
     * 社区编码
     */
    private String regionCode;

    /**
     * 地址描述
     */
    @ExcelProperty("地址")
    private String address;

    /**
     * 重要程度标识
     */
    @ExcelProperty("等级标识")
    private Integer eventGradeId;

    /**
     * 重要程度名称
     */
    @ExcelProperty("等级")
    private String eventGradeName;

    /**
     * 活动状态标识
     */
    private Integer actTimeStateId;

    /**
     * 活动状态名称
     */
    @ExcelProperty("计时状态")
    private String actTimeStateName;

    /**
     * 是否督办
     */
    @ExcelProperty("是否督办标志位")
    private Integer pressFlag;

    @ExcelProperty("督办")
    private String pressFlagName;

    /**
     * 是否督查
     */
    @ExcelProperty("是否督查标志位")
    private Integer overseerFlag;

    @ExcelProperty("督查")
    private String overseerFlagName;

    /**
     * 流程开始时间
     */
    @ExcelProperty("上报时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String procStartTime;

    /**
     * 流程截止时间
     */
    @ExcelProperty("截止时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String procDeadLine;

    /**
     * 事件描述
     */
    @ExcelProperty("事件描述")
    private String eventDesc;

    /**
     * 事件状态
     */
    @ExcelProperty("事件状态Id")
    private Integer eventStateId;

    @ExcelProperty("事件状态")
    private String eventStateName;

    /**
     * 事件来源标识
     */
    @ExcelProperty("事件来源标识")
    private Integer eventSrcId;

    /**
     * 事件来源名称
     */
    @ExcelProperty("事件来源")
    private String eventSrcName;

    /**
     * 流程结束时间
     */
    @ExcelProperty("办结时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String procEndTime;

    /**
     * 超时时长
     */
    @ExcelProperty("超时时长")
    private String timeoutDuration;

    /**
     * 回访时间
     */
    @ExcelProperty("回访时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String visitorTime;

    /**
     * 满意度
     */
    @ExcelProperty("满意度")
    private String resultLabel;

    /**
     * 上报开始时间
     */
    @ExcelProperty("上报开始时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String reportStartTime;

    /**
     * 上报结束时间
     */
    @ExcelProperty("上报结束时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String reportEndTime;

    /**
     * 截止开始时间
     */
    @ExcelProperty("截止开始时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String deadLineStartTime;

    /**
     * 截止结束时间
     */
    @ExcelProperty("截止结束时间")
//    @JsonFormat(pattern="yyyy-MM-dd HH:mm:ss",timezone="GMT+8")
    private String deadLineEndTime;

    /**
     * 上报人电话
     */
    @ExcelProperty("上报人")
    private String publicReporterName;

    @ExcelProperty("联系电话")
    private String phone;

    /**
     * 超时状态(流程红绿灯)
     */
    private Integer procTimeStateId;

    /**
     * 超时状态对应名称
     */
    private String procTimeStateName;

    /**
     * 导出字段List
     */
    private List<String> exportColumnList;

    /**
     * 关键字
     */
    private String keyword;

    /**
     * 活动截止时间
     */
    private String deadLine;

    private Integer pageNum;

    private Integer pageSize;

    /**
     * 排序字段
     */
    private String orderField;

    /**
     * 排序方式
     */
    private String orderSort;

    /**
     * 事件Id
     */
    private Integer eventId;

}

分页查询方法:

代码语言:javascript
复制
@Override
public PageInfo<ComprehensiveQueryEventVO> pageComprehensiveQuery(ComprehensiveQueryEventVO comprehensiveQueryEventVO) throws Exception {
        SearchSourceBuilder searchSourceBuilder = getSearchSourceBuilder(comprehensiveQueryEventVO);
        PageInfo page = esUtil.search(searchSourceBuilder, comprehensiveQueryEventVO.getPageNum(), comprehensiveQueryEventVO.getPageSize(), EsEvent.class);
        // 转换
        List<ComprehensiveQueryEventVO> list = convertList(page.getList());
        page.setList(list);

        return page;
}


/**
     * 拼接综合查询 查询条件
     * @param comprehensiveQueryEventVO
     * @return
     */
private SearchSourceBuilder getSearchSourceBuilder(ComprehensiveQueryEventVO comprehensiveQueryEventVO){
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        if (comprehensiveQueryEventVO.getPageNum() == null) {
            comprehensiveQueryEventVO.setPageNum(1);
        }

        if (comprehensiveQueryEventVO.getPageSize() == null) {
            comprehensiveQueryEventVO.setPageSize(10000);
        }

        sourceBuilder.from((comprehensiveQueryEventVO.getPageNum()-1)*comprehensiveQueryEventVO.getPageSize());
        sourceBuilder.size(comprehensiveQueryEventVO.getPageSize());

        // 上报时间拼接时/分/秒
        if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getReportStartTime())) {
            comprehensiveQueryEventVO.setReportStartTime(comprehensiveQueryEventVO.getReportStartTime() + " 00:00:00");
        }
        if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getReportEndTime())) {
            comprehensiveQueryEventVO.setReportEndTime(comprehensiveQueryEventVO.getReportEndTime() + " 23:59:59");
        }
        // 截止时间拼接时/分/秒
        if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getDeadLineStartTime())) {
            comprehensiveQueryEventVO.setDeadLineStartTime(comprehensiveQueryEventVO.getDeadLineStartTime() + " 00:00:00");
        }
        if (!ObjectUtils.isEmpty(comprehensiveQueryEventVO.getDeadLineEndTime())) {
            comprehensiveQueryEventVO.setDeadLineEndTime(comprehensiveQueryEventVO.getDeadLineEndTime() + " 23:59:59");
        }

        // 符合条件查询
        BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();

        // event_state_id != 0 and event_state_id != 11 and delete_flag != 1
        boolBuilder.mustNot(QueryBuilders.termQuery("eventStateId", 0));
        boolBuilder.mustNot(QueryBuilders.termQuery("eventStateId", 11));
        boolBuilder.mustNot(QueryBuilders.termQuery("delateFlag", 1));

        // 动态条件----keyword
        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getKeyword())) {
            boolBuilder.must(QueryBuilders.queryStringQuery(comprehensiveQueryEventVO.getKeyword()));
        }

        // 拼接动态查询条件
        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventCode())) {
            boolBuilder.must(QueryBuilders.termQuery("eventCode", comprehensiveQueryEventVO.getEventCode()));
        }

        if (comprehensiveQueryEventVO.getEventGradeId() != null) {
            boolBuilder.must(QueryBuilders.termQuery("eventGradeId", comprehensiveQueryEventVO.getEventGradeId()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventGradeName())) {
            boolBuilder.must(QueryBuilders.termQuery("eventGradeNam", comprehensiveQueryEventVO.getEventGradeName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventTypeName())) {
            boolBuilder.must(QueryBuilders.termQuery("eventTypeName", comprehensiveQueryEventVO.getEventTypeName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventTypeCode())) {
            boolBuilder.must(QueryBuilders.termQuery("eventTypeCode", comprehensiveQueryEventVO.getEventTypeCode()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventSubtypeName())) {
            boolBuilder.must(QueryBuilders.termQuery("eventSubtypeName", comprehensiveQueryEventVO.getEventSubtypeName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventSubtypeCode())) {
            boolBuilder.must(QueryBuilders.termQuery("eventSubtypeCode", comprehensiveQueryEventVO.getEventSubtypeCode()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getAreaRegionName())) {
            boolBuilder.must(QueryBuilders.termQuery("areaRegionName", comprehensiveQueryEventVO.getAreaRegionName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getAreaRegionCode())) {
            boolBuilder.must(QueryBuilders.termQuery("areaRegionCode", comprehensiveQueryEventVO.getAreaRegionCode()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getCommRegionName())) {
            boolBuilder.must(QueryBuilders.termQuery("commRegionName", comprehensiveQueryEventVO.getCommRegionName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getCommRegionCode())) {
            boolBuilder.must(QueryBuilders.termQuery("commRegionCode", comprehensiveQueryEventVO.getCommRegionCode()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getStreetRegionName())) {
            boolBuilder.must(QueryBuilders.termQuery("streetRegionName", comprehensiveQueryEventVO.getStreetRegionName()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getStreetRegionCode())) {
            boolBuilder.must(QueryBuilders.termQuery("streetRegionCode", comprehensiveQueryEventVO.getStreetRegionName()));
        }

        if (comprehensiveQueryEventVO.getEventSrcId() != null) {
            boolBuilder.must(QueryBuilders.termQuery("eventSrcId", comprehensiveQueryEventVO.getEventSrcId()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getEventSrcName())) {
            boolBuilder.must(QueryBuilders.termQuery("eventSrcName", comprehensiveQueryEventVO.getEventSrcName()));
        }

        if (comprehensiveQueryEventVO.getProcTimeStateId() != null) {
            boolBuilder.must(QueryBuilders.termQuery("procTimeStateId", comprehensiveQueryEventVO.getProcTimeStateId()));
        }

        if (comprehensiveQueryEventVO.getPressFlag() != null) {
            boolBuilder.must(QueryBuilders.termQuery("pressFlag", comprehensiveQueryEventVO.getPressFlag()));
        }

        if (comprehensiveQueryEventVO.getOverseerFlag() != null) {
            boolBuilder.must(QueryBuilders.termQuery("overseerFlag", comprehensiveQueryEventVO.getOverseerFlag()));
        }

        if (comprehensiveQueryEventVO.getEventStateId() != null) {
            boolBuilder.must(QueryBuilders.termQuery("eventStateId", comprehensiveQueryEventVO.getEventStateId()));
        }

        if (comprehensiveQueryEventVO.getActTimeStateId() != null) {
            boolBuilder.must(QueryBuilders.termQuery("act.actTimeStateId", comprehensiveQueryEventVO.getActTimeStateId()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getReportStartTime())) {
            boolBuilder.must(QueryBuilders.rangeQuery("procStartTime").gte(comprehensiveQueryEventVO.getReportStartTime()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getReportEndTime())) {
            boolBuilder.must(QueryBuilders.rangeQuery("procStartTime").lte(comprehensiveQueryEventVO.getReportEndTime()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getDeadLineStartTime())) {
            boolBuilder.must(QueryBuilders.rangeQuery("act.deadLine").gte(comprehensiveQueryEventVO.getDeadLineStartTime()));
        }

        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getDeadLineEndTime())) {
            boolBuilder.must(QueryBuilders.rangeQuery("act.deadLine").lte(comprehensiveQueryEventVO.getDeadLineEndTime()));
        }
        sourceBuilder.query(boolBuilder);

        // 排序:
        if (StringUtils.isNotEmpty(comprehensiveQueryEventVO.getOrderField()) && StringUtils.isNotEmpty(comprehensiveQueryEventVO.getOrderSort())) {
            String esOrderField = null;
            if (EventInfoServiceImpl.QueryFieldEnum.PROC_START_TIME.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
                esOrderField = "procStartTime";
            } else if (EventInfoServiceImpl.QueryFieldEnum.PROC_DEAD_LINE.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
                esOrderField = "act.deadLine";
            } else if (EventInfoServiceImpl.QueryFieldEnum.PROC_END_TIME.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
                esOrderField = "procEndTime";
            } else if (EventInfoServiceImpl.QueryFieldEnum.VISITOR_TIME.getFieldName().equals(comprehensiveQueryEventVO.getOrderField())) {
                esOrderField = "visitor.visitorTime";
            }
            FieldSortBuilder fieldSortBuilder = new FieldSortBuilder(esOrderField);
            fieldSortBuilder = fieldSortBuilder.order("orderDesc".equals(comprehensiveQueryEventVO.getOrderSort()) ? SortOrder.DESC : SortOrder.ASC);
            sourceBuilder.sort(fieldSortBuilder);
        }

    return sourceBuilder;
}
六. 数据同步

基本的方法都有了,下面就是数据同步了,我们需要在所有的对数据库有增删改操作的地方,同时去操作es,这一部分可以分散在代码的各个角落,所以推荐大家通过aop去处理,去定义对应的切面,在切面中统一处理。我这里利用了消息队列,主要原因是想做成异步,因为有些操作比较本身已经比较耗时了,所以不想在操作后,还去等待es的处理结果,所以直接扔到队列里,由队列异步处理:

发送到队列代码:

代码语言:javascript
复制
// 同步es
mqService.sendEs(SendEsVO.builder().eventId(Integer.valueOf(actionVO.getBizId())).build());
代码语言:javascript
复制
@Override
public void sendEs(SendEsVO sendEsVO) {
	logger.info("发送mq请求【同步ES】===》" + JSONObject.toJSONString(sendEsVO));
	rabbitTemplate.convertAndSend(MqBaseConst.MQ_EXCHANGE_ES,
	MqBaseConst.MQ_KEY_ES, JSONObject.toJSONString(sendEsVO));
}

队列消费者代码:

代码语言:javascript
复制
@Component
@Slf4j
public class EsMqListener {

    private final EsService esService;

    public EsMqListener(EsService esService) {
        this.esService = esService;
    }

    @RabbitHandler
    @RabbitListener(queues = MqBaseConst.MQ_QUEUE_ES)
    public void receiveGatewayMsg(String msg){
        log.info("【es】消息系统收到消息,内容为 {}",msg);
        if (StringUtils.isNoneEmpty(msg)) {
            SendEsVO sendEsVO = JSON.parseObject(msg, SendEsVO.class);

            // 判断操作类型,目前都是 单条的saveOrUpdate 操作
            try {
                esService.saveOrUpdate(sendEsVO.getEventId());
            } catch (Exception e) {
                log.error("eventId: [{}], 同步到es失败,错误为:{}", sendEsVO.getEventId(), e);
                //todo: 失败后的处理
                return;
            }
            log.info("eventId: [{}], 同步到es成功", sendEsVO.getEventId());
        }
    }
}
代码语言:javascript
复制
@Override
public boolean saveOrUpdate(Integer eventId) throws Exception {

  // 1. 如果索引不存在则创建
  boolean re = esUtil.createIndexIfNotExist(EsEvent.class);
  // 如果返回true, 代表该索引不存在,已经创建了,那么此时应该做一次全量同步
  if (re) {
    batchSaveOrUpdate();
    return true;
  }

  // 2. 执行到此处,说明索引存在,那么做单条数据同步
  EventDO eventDO = eventService.getById(eventId);
  EsEvent event = ModelMapperUtil.map(eventDO, EsEvent.class, "yyyy-MM-dd HH:mm:ss");
  // 查询act
  CooActDO cooActDO = actService.getOne(new LambdaQueryWrapper<CooActDO>().eq(CooActDO::getBizId, event.getEventId()));
  if (cooActDO != null) {
    EsAct act =  ModelMapperUtil.map(cooActDO, EsAct.class, "yyyy-MM-dd HH:mm:ss");
    event.setAct(act);
  }

  IndexResponse indexResponse = esUtil.index(event);
  if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
    log.info("eventId {}, 在es中对应的数据创建成功");
    return true;
  } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    log.info("eventId {}, 在es中对应的数据更新成功");
    return true;
  }
  return false;
}

同时定时任务做全量同步:

代码语言:javascript
复制
/**
 * @className: MysqlToEsTaskServiceImpl
 * @description: mysql数据同步到es的定时任务
 * @author: sh.Liu
 * @date: 2020-07-24 16:28
 */
@Service
@Slf4j
public class MysqlToEsTaskServiceImpl implements MysqlToEsTaskService {

    private final EsService esService;

    public MysqlToEsTaskServiceImpl(EsService esService) {
        this.esService = esService;
    }

    @Override
    public void syncEventData() {
        log.info("将 mysql【事件数据】 全量同步到 elasticsearch 定时任务开始执行。。。");
        try {
            esService.batchSaveOrUpdate();
        } catch (Exception e) {
            log.error("将 mysql【事件数据】 全量同步到 elasticsearch 定时任务执行失败,{}", e);
        }
        log.info("将 mysql【事件数据】 全量同步到 elasticsearch 定时任务执行结束。。。");

    }
}

全量同步代码;

代码语言:javascript
复制
/**
     * mysql数据库和es全量同步
     * @return
     * @throws Exception
     */
@Override
public boolean batchSaveOrUpdate() throws Exception {
  // 1. 如果索引不存在则创建
  esUtil.createIndexIfNotExist(EsEvent.class);

  // 2. 全量同步
  List<EsEvent> esEventList = new ArrayList<>();
  // 查询所有es数据,不包含草稿箱  eventStateId != 0
  List<EventDO> list = eventService.list(
    new LambdaQueryWrapper<EventDO>().ne(EventDO::getEventStateId,EventStateEnum.EVENTSTATE_DRAFT.getCode()));
  if (list != null && list.size() > 0) {
    for (EventDO eventDO : list) {
      EsEvent event = ModelMapperUtil.map(eventDO, EsEvent.class, "yyyy-MM-dd HH:mm:ss");
      // 查询act
      CooActDO cooActDO = actService.getOne(new LambdaQueryWrapper<CooActDO>().eq(CooActDO::getBizId, event.getEventId()));
      if (cooActDO != null) {
        EsAct act =  ModelMapperUtil.map(cooActDO, EsAct.class, "yyyy-MM-dd HH:mm:ss");
        event.setAct(act);
      }
      esEventList.add(event);
    }
  }
  boolean b = esUtil.batchSaveOrUpdate(esEventList);
  return b;
}

然后将上述方法配置到xxl-job中即可。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/07/27 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • springboot集成elasticsearch7实现全文检索及分页
    • 一. 概述
      • 二. 准备工作
        • 三. 集成
          • 四. 自定义注解创建索引
            • 五. 分页查询封装:
              • 六. 数据同步
              相关产品与服务
              Elasticsearch Service
              腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档