ElasticSearch之Java Api聚合分组实战

最近有个日志收集监控的项目采用的技术栈是ELK+JAVA+Spring,客户端语言使用的是Java,以后有机会的话可以试一下JavaScript+Nodejs的方式,非常轻量级的组合,只不过不太适合服务化的工程,Kibana充当可视化层,功能虽然非常强大和灵活,但是需要业务人员懂Lucene的查询语法和Kibana的Dashboard仪表盘自定义功能才能玩的转,所以Kibana面向专业的开发人员和运维人员比较良好,但面向业务人员则稍微有点难度,我们这边就使用Java进行二次开发,然后前端定义几个业务人员关注的图表,然后把后端查询的数据,按照一定的维度放进去即可。 基础环境: (1)ElasticSearch1.7.2 (2)Logstash2.2.2 (3)Kibana4.1.2 (3)JDK7 (4)Spring4.2 使用到的技术点: (1)ElasticSearch的查询 (2)ElasticSearch的过滤 (3)ElasticSearch的日期聚合 (4)ElasticSearch的Terms聚合 (5)ElasticSearch的多级分组 (6)ElasticSearch+Logstash的时区问题 直接上代码:

Java代码

  1. /**
  2. * Created by qindongliang on 2016/4/6.
  3. */
  4. @Repository("esDaoImpl")
  5. public class ESDaoImpl implements ESDao {
  6. private static Logger log= LoggerFactory.getLogger(ESDaoImpl.class);
  7. @Autowired
  8. private ESConf esConf;
  9. @Resource(name = "client")
  10. private Client client;
  11. @Override
  12. public MonitorCount count() {
  13. MonitorCount count=new MonitorCount();
  14. //今天的数量
  15. count.setToday_count(customCount(false,"*:*"));
  16. //今天的入库量
  17. count.setToday_store_count(customCount(false,"-save:1"));
  18. //所有的总量
  19. count.setTotal_count(customCount(true,"*:*"));
  20. //所有的入库总量
  21. count.setTotal_store_count(customCount(true,"-save:1"));
  22. return count;
  23. }
  24. private long customCount(boolean isQueryAll, String queryString){
  25. try {
  26. //今天的开始时间 比如2016-04-01 00:00:00
  27. long today_start = TimeTools.getDayTimeStamp(0);
  28. //今天的结束时间 也就是明天的开始时间 比如2016-04-02 00:00:00
  29. //一闭区间一开区间即得到一天的统计量
  30. long today_end=TimeTools.getDayTimeStamp(1);
  31. StringBuffer fq = new StringBuffer();
  32. fq.append("@timestamp:")
  33. .append(" [ ")
  34. .append(today_start)
  35. .append(" TO ")
  36. .append(today_end)
  37. .append(" } ");
  38. //构建查询请求,使用Lucene高级查询语法
  39. QueryBuilder query=QueryBuilders.queryStringQuery(queryString);
  40. //构建查询请求
  41. SearchRequestBuilder search = client.prepareSearch("crawl*").setTypes("logs");
  42. //非所有的情况下,设置日期过滤
  43. if(isQueryAll){
  44. search.setQuery(query);//查询所有
  45. }else {//加上日期过滤
  46. search.setQuery(QueryBuilders.filteredQuery(query, FilterBuilders.queryFilter(QueryBuilders.queryStringQuery(fq.toString()))));
  47. }
  48. SearchResponse r = search.get();//得到查询结果
  49. long hits = r.getHits().getTotalHits();//读取命中数量
  50. return hits;
  51. }catch (Exception e){
  52. log.error("统计日期数量出错!",e);
  53. }
  54. return 0;
  55. }
  56. @Override
  57. public List<GroupCount> query(Condition condition) {
  58. return grouyQuery(condition);
  59. }
  60. /***
  61. * @param c 查询的条件
  62. * @return 查询的结果
  63. */
  64. private List<GroupCount> grouyQuery(Condition c){
  65. //封装结果集
  66. List<GroupCount> datas=new ArrayList<>();
  67. //组装分组
  68. DateHistogramBuilder dateAgg = AggregationBuilders.dateHistogram("dateagg");
  69. //定义分组的日期字段
  70. dateAgg.field("@timestamp");
  71. //按天分组
  72. if(CountType.EACH_DAY==(c.getType())) {
  73. dateAgg.interval(DateHistogram.Interval.DAY);
  74. dateAgg.timeZone("+8:00");
  75. dateAgg.format("yyyy-MM-dd");
  76. //按小时分组
  77. }else if(CountType.EACH_HOUR==c.getType()){
  78. dateAgg.interval(DateHistogram.Interval.HOUR);
  79. //按小时分组,必须使用这个方法,不然得到的结果不正确
  80. dateAgg.postZone("+8:00");
  81. dateAgg.format("yyyy-MM-dd HH");
  82. //无效分组
  83. }else{
  84. throw new NullPointerException("无效的枚举类型");
  85. }
  86. //二级分组,统计入库的成功失败量 0 1 2 , 1为不成功
  87. dateAgg.subAggregation(AggregationBuilders.terms("success").field("save"));
  88. //查询过滤条件
  89. StringBuffer fq = new StringBuffer();
  90. //过滤时间字段
  91. fq.append(" +@timestamp:")
  92. .append(" [ ")
  93. .append(c.getStart().getTime())
  94. .append(" TO ")
  95. .append(c.getEnd().getTime())
  96. .append(" } ");
  97. //过滤一级
  98. if(StringUtils.isNotEmpty(c.getT1())){
  99. fq.append(" +t1:").append(c.getT1());
  100. }
  101. //过滤二级
  102. if(StringUtils.isNotEmpty(c.getT2())){
  103. fq.append(" +t2:").append(c.getT2());
  104. }
  105. //过滤三级
  106. if(StringUtils.isNotEmpty(c.getT3())){
  107. fq.append(" +t3:").append(c.getT3());
  108. }
  109. //过滤url
  110. if(StringUtils.isNotEmpty(c.getSourceUrl())){
  111. //对url进行转义,防止查询出现错误
  112. fq.append(" +url:").append(QueryParserBase.escape(c.getSourceUrl()));
  113. }
  114. //过滤省份编码
  115. if(StringUtils.isNotEmpty(c.getProvinceCode())){
  116. fq.append(" +pcode:").append(c.getProvinceCode());
  117. }
  118. //过滤入库状态
  119. if(c.getSavaState()!=null){
  120. fq.append(" +save:").append(c.getSavaState().getCode());
  121. }
  122. //过滤http状态码
  123. if(c.getWebsiteState()!=null){
  124. if(!c.getWebsiteState().getCode().equals("-1")) {
  125. fq.append(" +httpcode:").append(c.getWebsiteState().getCode());
  126. }else{
  127. fq.append(" -httpcode:").append("(0 110 200)");
  128. }
  129. }
  130. //过滤配置configid
  131. if(StringUtils.isNotEmpty(c.getConfigId())){
  132. fq.append(" +cid:").append(c.getConfigId());
  133. }
  134. //查询索引
  135. SearchRequestBuilder search=client.prepareSearch("crawl*").setTypes("logs");
  136. //组装请求
  137. search.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(),
  138. FilterBuilders.queryFilter(QueryBuilders.queryStringQuery(fq.toString())
  139. .defaultOperator(QueryStringQueryBuilder.Operator.AND)
  140. ))).addAggregation(dateAgg);
  141. //获取查询结果
  142. SearchResponse r = search.get();//得到查询结果
  143. //获取一级聚合数据
  144. Histogram h=r.getAggregations().get("dateagg");
  145. //得到一级聚合结果里面的分桶集合
  146. List<DateHistogram.Bucket> buckets = (List<DateHistogram.Bucket>) h.getBuckets();
  147. //遍历分桶集
  148. for(DateHistogram.Bucket b:buckets){
  149. //读取二级聚合数据集引用
  150. Aggregations sub = b.getAggregations();
  151. //获取二级聚合集合
  152. StringTerms count = sub.get("success");
  153. GroupCount groupCount=new GroupCount();
  154. //设置x轴分组日期
  155. groupCount.setGroupKey(b.getKey());
  156. //设置指定分组条件下入库总量
  157. groupCount.setTotal_count(b.getDocCount());
  158. //读取指定分组条件下不成功的数量
  159. long bad_count=count.getBucketByKey("1")==null?0:count.getBucketByKey("1").getDocCount();
  160. //设置指定分组条件下成功的入库量
  161. groupCount.setTotal_store_count(b.getDocCount()-bad_count);
  162. //计算成功率
  163. groupCount.setSuccess_rate(groupCount.getTotal_store_count()*1.0/groupCount.getTotal_count());
  164. //添加到集合里面
  165. datas.add(groupCount);
  166. }
  167. return datas;
  168. }
  169. }

总结: (1)关于时区的问题,目前发现在测试按小时,按天分组统计的时候,时区使用的方法不是一致的,而postZone这个方法,在1.5版本已经废弃,说是使用timeZone替代,但经测试发现在按小时分组的时候,使用timeZone加8个时区的并没生效,后续看下最新版本的ElasticSearch是否修复。 (2)使用Terms的聚合分组时,这个字段最好是没有分过词的,否则大量的元数据返回,有可能会发生OOM的异常 (3)在不需要评分排名查询的场景中,尽量使用filter查询,elasticsearch会缓存查询结果,从而能大幅提高检索性能 今天先总结这么多,后续有空再关注下 (1)elasticsearch中的Aggregations和Facet的区别以及对比Solr中的Group和Facet的区别 (2)在不同的聚合渠道中多级分组中是组内有序还是全局有序

原文发布于微信公众号 - 我是攻城师(woshigcs)

原文发表时间:2016-04-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏MasiMaro 的技术博文

windows 驱动开发入门——驱动中的数据结构

最近在学习驱动编程方面的内容,在这将自己的一些心得分享出来,供大家参考,与大家共同进步,本人学习驱动主要是通过两本书——《独钓寒江 windows安全编程》 和...

1392
来自专栏lgp20151222

整理代码,将一些曾经用过的功能整合进一个spring-boot

由于本人的码云太多太乱了,于是决定一个一个的整合到一个springboot项目里面。

2203
来自专栏Seebug漏洞平台

“盲”逆向:iOS 应用 Blind 寻踪

原文地址:《"BLIND" Reversing - A Look At The Blind iOS App》

3827
来自专栏芋道源码1024

MyBastis 三种批量插入方式的性能比较

数据库使用的是sqlserver,JDK版本1.8,运行在SpringBoot环境下

3833
来自专栏Java架构解析

深入理解Java中的底层阻塞原理及实现

Information Technology Solutions as a Presentation

550
来自专栏ml

HDUOJ-------2149Public Sale

Public Sale Time Limit: 1000/1000 MS (Java/Others)    Memory Limit: 32768/32768 ...

2938
来自专栏人工智能LeadAI

实时Android语音对讲系统架构

本文属于Android局域网内的语音对讲项目(https://github.com/yhthu/intercom)系列,《通过UDP广播实现Android局域网...

1.3K4
来自专栏我的小碗汤

19 个很有用的 ElasticSearch 查询语句 篇一

为了演示不同类型的 ElasticSearch 的查询,我们将使用书文档信息的集合(有以下字段:title(标题), authors(作者), summary(...

2K5
来自专栏落影的专栏

Audio Unit播放aac/m4a/mp3等文件

前言 相关文章: 使用VideoToolbox硬编码H.264 使用VideoToolbox硬解码H.264 使用AudioToolbox编码AAC 使...

91310
来自专栏linux驱动个人学习

高通Audio中ASOC的codec驱动(二)

继上一篇文章:高通Audio中ASOC的machine驱动(一) ASOC的出现是为了让codec独立于CPU,减少和CPU之间的耦合,这样同一个codec驱动...

9356

扫码关注云+社区

领取腾讯云代金券