专栏首页算法之名把日志灌入到Elasticsearch的好处以及具体实现

把日志灌入到Elasticsearch的好处以及具体实现

一般来讲一个高并发高性能的系统,日志是非常庞大的,随时可能高达几个T,一台服务器的硬盘极有可能装不下,而Elasticsearch的集群可以分布在不同的机器上,而又对整个集群作为一个整体,对其大容量的内容进行存储以及它的最牛掰的能力--检索.

首先在配置文件中做如下配置

elasticsearch:
  clusterName: aubin-cluster
  clusterNodes: 192.168.5.182:9300
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {

   private String clusterName;

   private String clusterNodes;

    /**
     * 使用elasticsearch实现类时才触发
     *
     * @return
     */
   @Bean
    @ConditionalOnBean(value = EsLogServiceImpl.class)
   public TransportClient getESClient() {
      // 设置集群名字
      Settings settings = Settings.builder().put("cluster.name", this.clusterName).build();
      TransportClient client = new PreBuiltTransportClient(settings);
      try {
         // 读取的ip列表是以逗号分隔的
         for (String clusterNode : this.clusterNodes.split(",")) {
            String ip = clusterNode.split(":")[0];
            String port = clusterNode.split(":")[1];
            client.addTransportAddress(new TransportAddress(InetAddress.getByName(ip), Integer.parseInt(port)));
         }
      } catch (UnknownHostException e) {
         e.printStackTrace();
      }

      return client;
   }
}

再定义一个操作接口

public interface LogService {

   void save(Log log);

   Page<Log> findLogs(Map<String, Object> params);

}

具体实现类为

@Service
public class EsLogServiceImpl implements LogService, ApplicationContextAware {

   private static final Logger logger = LoggerFactory.getLogger(EsLogServiceImpl.class);

   private static final String INDEX = "index_logs";
   private static final String TYPE = "type_logs";

   @Autowired
   private TransportClient client;

   @Async
   @Override
   public void save(Log log) {
      if (log.getCreateTime() == null) {
         log.setCreateTime(new Date());
      }
      if (log.getFlag() == null) {
         log.setFlag(Boolean.TRUE);
      }
      logger.info("{}", log);

      String string = JSONObject.toJSONString(log);

      IndexRequestBuilder builder = client.prepareIndex(INDEX, TYPE).setSource(string, XContentType.JSON);
      builder.execute();
   }

   @Override
   public Page<Log> findLogs(Map<String, Object> params) {
      SearchRequestBuilder builder = client.prepareSearch().setIndices(INDEX).setTypes(TYPE);
      if (!CollectionUtils.isEmpty(params)) {
         BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();

         // 用户名模糊匹配
         String username = MapUtils.getString(params, "username");
         if (StringUtils.isNoneBlank(username)) {
            queryBuilder.must(QueryBuilders.wildcardQuery("username", "*" + username + "*"));
         }

         // 模块精确匹配
         String module = MapUtils.getString(params, "module");
         if (StringUtils.isNoneBlank(module)) {
            queryBuilder.must(QueryBuilders.matchQuery("module", module));
         }

         String flag = MapUtils.getString(params, "flag");
         if (StringUtils.isNoneBlank(flag)) {
            Boolean bool = Boolean.FALSE;
            if ("1".equals(flag) || "true".equalsIgnoreCase(flag)) {
               bool = Boolean.TRUE;
            }
            queryBuilder.must(QueryBuilders.matchQuery("flag", bool));
         }

         // 大于等于开始日期,格式yyyy-MM-dd
         String beginTime = MapUtils.getString(params, "beginTime");
         if (StringUtils.isNoneBlank(beginTime)) {
            // 转化为0点0分0秒
            Long timestamp = toTimestamp(beginTime + "T00:00:00");
            queryBuilder.must(QueryBuilders.rangeQuery("createTime").from(timestamp));
         }

         // 小于等于结束日期,格式yyyy-MM-dd
         String endTime = MapUtils.getString(params, "endTime");
         if (StringUtils.isNoneBlank(endTime)) {
            // 转化为23点59分59秒
            Long timestamp = toTimestamp(endTime + "T23:59:59");
            queryBuilder.must(QueryBuilders.rangeQuery("createTime").to(timestamp));
         }

         if (queryBuilder != null) {
            builder.setPostFilter(queryBuilder);
         }
      }

      builder.addSort("createTime", SortOrder.DESC);

      PageUtil.pageParamConver(params, true);
      Integer start = MapUtils.getInteger(params, PageUtil.START);
      if (start != null) {
         builder.setFrom(start);
      }

      Integer length = MapUtils.getInteger(params, PageUtil.LENGTH);
      if (length != null) {
         builder.setSize(length);
      }

      SearchResponse searchResponse = builder.get();

      SearchHits searchHits = searchResponse.getHits();
      // 总数量
      Long total = searchHits.getTotalHits();

      int size = searchHits.getHits().length;
      List<Log> list = new ArrayList<>(size);
      if (size > 0) {
         searchHits.forEach(hit -> {
            String val = hit.getSourceAsString();
            list.add(JSONObject.parseObject(val, Log.class));
         });
      }

      return new Page<>(total.intValue(), list);
   }

   private Long toTimestamp(String str) {
      LocalDateTime localDateTime = LocalDateTime.parse(str);
      Date date = Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());

      return date.getTime();
   }

   private static ApplicationContext applicationContext = null;

   @Override
   public void setApplicationContext(ApplicationContext context) throws BeansException {
      applicationContext = context;
   }

   /**
    * 初始化日志es索引
    */
   @PostConstruct
   public void initIndex() {
      LogService logService = applicationContext.getBean(LogService.class);
      // 日志实现是否采用elasticsearch
      boolean flag = (logService instanceof EsLogServiceImpl);
      if (!flag) {
         return;
      }

      try {
         // 判断索引是否存在
         IndicesExistsResponse indicesExistsResponse = client.admin().indices()
               .exists(new IndicesExistsRequest(INDEX)).get();
         if (indicesExistsResponse.isExists()) {
            return;
         }
      } catch (InterruptedException e) {
         e.printStackTrace();
      } catch (ExecutionException e) {
         e.printStackTrace();
      }

      CreateIndexRequestBuilder requestBuilder = client.admin().indices().prepareCreate(INDEX);

      CreateIndexResponse createIndexResponse = requestBuilder.execute().actionGet();
      if (createIndexResponse.isAcknowledged()) {
         logger.info("索引:{},创建成功", INDEX);
      } else {
         logger.error("索引:{},创建失败", INDEX);
      }
   }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spring MVC的模板方法模式 顶

    模板方法模式是由抽象类或接口定义好执行顺序,由子类去实现,但无论子类如何实现,他都得按照抽象类或者接口定义好的顺序去执行。实例代码请参考 设计模式整理 ,Ser...

    算法之名
  • 如何遍历redis集合 顶

    当redis集合有大量的键值对的时候,如果使用smembers来获取就如同keys *在redis有大量键的时候一样,会产生性能问题,发生阻塞。所以建议使用ss...

    算法之名
  • ConcurrentHashMap 1.8原理解析

    JDK 1.8中,Hash家族有这么一些存在,HashMap,HashTable,LinkedHashMap,ConcurrentHashMap。这里面支持线程...

    算法之名
  • [Spring] 30个类手写 Spring Mini 版本系列(二)

    为了更深入的了解 Spring 的实现原理和设计思想,一直打算出个系列文章,从零开始重新学习 Spring。在[Spring] 30个类手写 Spring Mi...

    架构探险之道
  • vue 响应式原理

    简单点讲 vue 的响应式是通过 Object.defineProperty 和 观察者模式来实现的。 vue 初始化的时候 watcher 构造函数通过 O...

    大当家
  • Java实现的一个简单计算器,有字符分析功能

    需求:实现一个简单的计算器来分析一个简单的表达式字符串。 表达式字符串可能包含括号,+ +或减号,非负整数和空格。 例子:“1 + 1 = 2,(1)“= 1(...

    用户1289394
  • “超越权限”侵入计算机信息系统是否构成犯罪?

    超越授权权限进入计算机信息系统,其本质仍是违背了被害方的意愿,仍属于非法获取计算机信息系统数据罪中的侵入他人计算机信息系统的行为。

    周俊辉
  • C# 判断文件编码

    我们的项目中会包含有很多文件,但是可能我们没有注意到的,我们的文件的编码不一定是utf-8,所以可能在别人电脑运行时出现乱码。最近在做一个项目,这个项目可以把我...

    林德熙
  • 用Python在25行以下代码实现人脸识别

    在本文中,我们将看到一种使用Python和开放源码库开始人脸识别的非常简单的方法。

    Python知识大全
  • ReentrantLock中的unlock流程

    在调用到ReentrantLock的unlock方法的时候,无论公平锁与非公平锁都会调用到sync.release(1)方法。

    None_Ling

扫码关注云+社区

领取腾讯云代金券