前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >把日志灌入到Elasticsearch的好处以及具体实现

把日志灌入到Elasticsearch的好处以及具体实现

作者头像
算法之名
发布2019-08-20 17:35:08
4810
发布2019-08-20 17:35:08
举报
文章被收录于专栏:算法之名算法之名算法之名

一般来讲一个高并发高性能的系统,日志是非常庞大的,随时可能高达几个T,一台服务器的硬盘极有可能装不下,而Elasticsearch的集群可以分布在不同的机器上,而又对整个集群作为一个整体,对其大容量的内容进行存储以及它的最牛掰的能力--检索.

首先在配置文件中做如下配置

elasticsearch:
  clusterName: aubin-cluster
  clusterNodes: 192.168.5.182:9300
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {

   private String clusterName;

   private String clusterNodes;

    /**
     * 使用elasticsearch实现类时才触发
     *
     * @return
     */
   @Bean
    @ConditionalOnBean(value = EsLogServiceImpl.class)
   public TransportClient getESClient() {
      // 设置集群名字
      Settings settings = Settings.builder().put("cluster.name", this.clusterName).build();
      TransportClient client = new PreBuiltTransportClient(settings);
      try {
         // 读取的ip列表是以逗号分隔的
         for (String clusterNode : this.clusterNodes.split(",")) {
            String ip = clusterNode.split(":")[0];
            String port = clusterNode.split(":")[1];
            client.addTransportAddress(new TransportAddress(InetAddress.getByName(ip), Integer.parseInt(port)));
         }
      } catch (UnknownHostException e) {
         e.printStackTrace();
      }

      return client;
   }
}

再定义一个操作接口

public interface LogService {

   void save(Log log);

   Page<Log> findLogs(Map<String, Object> params);

}

具体实现类为

@Service
public class EsLogServiceImpl implements LogService, ApplicationContextAware {

   private static final Logger logger = LoggerFactory.getLogger(EsLogServiceImpl.class);

   private static final String INDEX = "index_logs";
   private static final String TYPE = "type_logs";

   @Autowired
   private TransportClient client;

   @Async
   @Override
   public void save(Log log) {
      if (log.getCreateTime() == null) {
         log.setCreateTime(new Date());
      }
      if (log.getFlag() == null) {
         log.setFlag(Boolean.TRUE);
      }
      logger.info("{}", log);

      String string = JSONObject.toJSONString(log);

      IndexRequestBuilder builder = client.prepareIndex(INDEX, TYPE).setSource(string, XContentType.JSON);
      builder.execute();
   }

   @Override
   public Page<Log> findLogs(Map<String, Object> params) {
      SearchRequestBuilder builder = client.prepareSearch().setIndices(INDEX).setTypes(TYPE);
      if (!CollectionUtils.isEmpty(params)) {
         BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();

         // 用户名模糊匹配
         String username = MapUtils.getString(params, "username");
         if (StringUtils.isNoneBlank(username)) {
            queryBuilder.must(QueryBuilders.wildcardQuery("username", "*" + username + "*"));
         }

         // 模块精确匹配
         String module = MapUtils.getString(params, "module");
         if (StringUtils.isNoneBlank(module)) {
            queryBuilder.must(QueryBuilders.matchQuery("module", module));
         }

         String flag = MapUtils.getString(params, "flag");
         if (StringUtils.isNoneBlank(flag)) {
            Boolean bool = Boolean.FALSE;
            if ("1".equals(flag) || "true".equalsIgnoreCase(flag)) {
               bool = Boolean.TRUE;
            }
            queryBuilder.must(QueryBuilders.matchQuery("flag", bool));
         }

         // 大于等于开始日期,格式yyyy-MM-dd
         String beginTime = MapUtils.getString(params, "beginTime");
         if (StringUtils.isNoneBlank(beginTime)) {
            // 转化为0点0分0秒
            Long timestamp = toTimestamp(beginTime + "T00:00:00");
            queryBuilder.must(QueryBuilders.rangeQuery("createTime").from(timestamp));
         }

         // 小于等于结束日期,格式yyyy-MM-dd
         String endTime = MapUtils.getString(params, "endTime");
         if (StringUtils.isNoneBlank(endTime)) {
            // 转化为23点59分59秒
            Long timestamp = toTimestamp(endTime + "T23:59:59");
            queryBuilder.must(QueryBuilders.rangeQuery("createTime").to(timestamp));
         }

         if (queryBuilder != null) {
            builder.setPostFilter(queryBuilder);
         }
      }

      builder.addSort("createTime", SortOrder.DESC);

      PageUtil.pageParamConver(params, true);
      Integer start = MapUtils.getInteger(params, PageUtil.START);
      if (start != null) {
         builder.setFrom(start);
      }

      Integer length = MapUtils.getInteger(params, PageUtil.LENGTH);
      if (length != null) {
         builder.setSize(length);
      }

      SearchResponse searchResponse = builder.get();

      SearchHits searchHits = searchResponse.getHits();
      // 总数量
      Long total = searchHits.getTotalHits();

      int size = searchHits.getHits().length;
      List<Log> list = new ArrayList<>(size);
      if (size > 0) {
         searchHits.forEach(hit -> {
            String val = hit.getSourceAsString();
            list.add(JSONObject.parseObject(val, Log.class));
         });
      }

      return new Page<>(total.intValue(), list);
   }

   private Long toTimestamp(String str) {
      LocalDateTime localDateTime = LocalDateTime.parse(str);
      Date date = Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());

      return date.getTime();
   }

   private static ApplicationContext applicationContext = null;

   @Override
   public void setApplicationContext(ApplicationContext context) throws BeansException {
      applicationContext = context;
   }

   /**
    * 初始化日志es索引
    */
   @PostConstruct
   public void initIndex() {
      LogService logService = applicationContext.getBean(LogService.class);
      // 日志实现是否采用elasticsearch
      boolean flag = (logService instanceof EsLogServiceImpl);
      if (!flag) {
         return;
      }

      try {
         // 判断索引是否存在
         IndicesExistsResponse indicesExistsResponse = client.admin().indices()
               .exists(new IndicesExistsRequest(INDEX)).get();
         if (indicesExistsResponse.isExists()) {
            return;
         }
      } catch (InterruptedException e) {
         e.printStackTrace();
      } catch (ExecutionException e) {
         e.printStackTrace();
      }

      CreateIndexRequestBuilder requestBuilder = client.admin().indices().prepareCreate(INDEX);

      CreateIndexResponse createIndexResponse = requestBuilder.execute().actionGet();
      if (createIndexResponse.isAcknowledged()) {
         logger.info("索引:{},创建成功", INDEX);
      } else {
         logger.error("索引:{},创建失败", INDEX);
      }
   }

}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档