首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Druid和ES查询结果通用解析方法

Druid和ES查询结果通用解析方法

作者头像
actionzhang
发布2022-11-30 17:10:42
8420
发布2022-11-30 17:10:42
举报

做数据的同学相信大家对Druid和Es都不陌生,Druid可以说是一款基于时序的查询引擎,支持数据实时摄入,在数据摄入前指定维度和指标,提供基于时间层面的预聚合,Druid会把一个数据点当做一个实际发生的事实,在数据摄入后就不能修改。常被应用于一些实时的场景,比如对数据实时分时间段分组聚合。ES同样是一款高效的查询引擎,支持数据的批量导入,同样支持数据实时的摄入,也支持数据批量导入,相比于Druid不仅对聚合高度支持,同时兼顾强大的搜索能力,ES主要是基于对摄入数据进行分词,同时构建索引增加查询聚合的速度。通常我一般将ES用作一些离线的场景,对离线场景支持指标的快速查询和聚合。

Druid实践

        Druid提供良好的Rest风格的访问方式,方便开发者快速上手,其提供的查询与聚合的方式多种多样,一般我们最常用的查询是select,聚合方式是groupBy,具体使用方式大家可以上网百度,这里主要介绍对于查询结果的解析。Druid返回的数据格式一般是一个JSON格式的数组,数组的每一个元素都是一个时间点的数据,如下图:

[
    {
        "version":"v1",
        "timestamp":"2018-07-08T08:05:00.000Z",
        "event":{
            "dim1":"test1",
            "dim2":"test2",
            "sumMetric1":12345,
            "sumMetric2":14567
        }
    }
]

查询结果主要组成部分有两个,一个是timestamp,另一个是event,timestamp代表事实发生的时间,event主要包含聚合的维度和指标。如上图dim1和dim2是聚合的维度,metric是聚合的指标。显然druid的查询结果是平铺展示的,不论是普通的select还是groupby,但是这样的展示形式不适合于groupby的展示方式,比如dim1的组成值有“d11”和“d12”,而dim2的组成值有“d21”和“d22”,那么查询结果在同一个时间点有四条展示数据,[d11+d21,d11+d22,d12+d21,d12+d22] 这样显然不太便于查看,我们更希望的展示结果可能如下图,在查询结果中,按照聚合查询的结构展示,这样更方便看清数据,重点在于一个时间点,在数组中只有一个数据点。

[
      {
          "timestamp":"2018-07-08T08:05:00.000Z",
          "event":{
              "d11":{
                 "d21":{
                    "metric":123
                 },
                 "d22":{
                    "metric":234
                 }
              },
              "d12":{
                 "d21":{
                    "metric":235
                 },
                 "d22":{
                    "metric":345
                 }
              }
          }
      }
]

为了将平铺的结果处理成为结果化的结果,需要在构建查询的时候,就把聚合方式记录下来,我以一个简单的例子来,解释聚合的存储,如下图一个简单的聚合,对dim1和dim2分组,组内进行metric1和metric2的SUM聚合,这样的聚合方式可以使用一个树来存储整个聚合方式,如图所示,顶层聚合是group by dim1,其子聚合是group by dim2,接下来的子聚合分别是sum metric1和sum metric2,所以聚合可以用一个类来表示--Aggregation,聚合类型(aggType)可以是group by ,sum,max,min,count,avg,聚合字段(aggField)为了表明在哪个字段上聚合的,同时为了支持给聚合起别名,增加alias字段,最终要的就是利用一个list存储子聚合。

SELECT 
      SUM(metric1),
      SUM(metric2)
FROM test
GROUP BY
      dim1,dim2
/**
**  表示聚合的类
**/
public class Aggregation{
 
     private String aggType;//聚合类型

     private String aggField;//聚合字段

     private String alias;//聚合名字

     private List<Aggregation> subAggs = new LinkedList<>();//子聚合

     public static Aggregation buildAggregation(String aggType,String aggField,String alias){
         return new Aggregation(aggType,aggField,alias);
     }

     private Aggregation(String aggType,String aggField,String alias){
          this.aggType = aggType;
          this.aggField = aggField;
          this.alias = alias;
     }

     public void subAggs(Aggregation ... subAggs){
         for(Aggregation aggregation:subAggs){
             this.subAggs.add(subAggs);
         }
     }

     setters/getters...
}

有了能够代表聚合的类,那么上图在查询的时候,构建聚合的代码可以像下面这样写(如果封装得好的话,这些代码可以埋在构建实际查询的代码中):

Aggregation groupByDim1 = Aggregation.buildAggregation("groupBy","dim1","groupByDim1");
Aggregation groupByDim2 = Aggregation.buildAggregation("groupBy","dim2","groupByDim2");
Aggregation sumMetric1 = Aggregation.buildAggregation("sum","metric1","sumMetric1");
Aggregation sumMetric2 = Aggregation.buildAggregation("sum","metric2","sumMetric2");
groupByDim1.subAggs(groupByDim2);
groupByDim2.subAggs(sumMetric1,sumMetic2);

那么接下来就是查询结果的解析了,有了上面的构建的聚合,就方便对查询结果做解析了,下面我大概写个解析思路:

public Map<String,JSONObject> parseDruidResult(Aggregation agg,List<JSONObject> searchResult){
     Map<String,JSONObject> tempResult = new TreeMap<>();
     for(JSONObject eventInfo:searchResult){
         String timestamp = eventInfo.getString("timestamp");
         JSONObject structedResult = null;   
         if(!tempResult.containsKey(timestamp)){
             structedResult = new JSONObject();     
             tempResult.put(timestamp,structedResult);
         }else{
             structedResult = tempResult.get(timestamp);
         }
         assembleResult(agg,structedResult,eventInfo.getJSONObject("event"));
     }
   return tempResult;

}

public void assembleResult(Aggregation aggregation,JSONObject structedResult,JSONObject event)){
    if(!isLeaf(aggregation.getAggType)){//isLeaf 判断是否是聚合的叶子节点,这里sum,count,max,min,avg没有子聚合肯定是叶子节点,其他都是非叶节点
        JSONObject subStructedResult = new JSONObject();
        structedResult.put(event.get(aggregation.getAggField()),subStructedResult);
        for(Aggregation subAgg:aggregation.getSubAggs){
           assembleResult(subAgg,subStructedResult,event);
        }
    }else{
        structedResult.put(aggregation.getAlias(),event.get(aggregation.getAlias()));
    }
}

其实解析的思路就是,根据树形的聚合结果来解析平铺的查询结果,以满足结构化查询的需求。以上代码值得说明的有两点,1,在Druid的查询结果中,维度是以field的名称放在event中,指标之一alias的名称放在event中,而对维度的聚合对应非叶节点的聚合,对指标的聚合对应叶节点的聚合,所以在代码中,对应取不同的数据。2,查询结果为了方便处理以map来存放解析结果的,key是timestamp,value是这个时间点的结构化结果,为了转换成为我们想要数组形式,可以遍历map,为了时间有序,可以用TreeMap存放中间结果。

ES实践

      ES对外也提供良好的RestApi查询方式,并且新版client不需要我们拼接json去查询或解析查询结果,可以使用java Api方便解析,这里我们就是使用新版的java Api来查询ES,ES相对于Druid,聚合结果不是平铺的,而是结果化的,但是这样的结构化结果,甚至比平铺的结果还复杂,需要我们通过java代码一层层解析出来。假如执行一个复杂的聚合,结果解析可能非常复杂,甚至难以排查出现的错误,举个例子,假如一个复杂的聚合(其实实际当中也不算复杂)如下图:

图中dim1和dim2为两个维度,dim2是比dim1更低的一个维度,分别对这两个维度进行sum,聚合两个指标metric1和metric2,这个例子很简单,比如dim1代表年,dim2代表月份,metric1是点击量,metric2是阅读量,那么上面的聚合树就代表,分别按照月份和年份统计点击量和阅读量。构建上面聚合树的代码可以是下面这样的:

Aggregation sumMetric1 = Aggregation.buildAggregation("sum","metric1","sumMetric1");
Aggregation sumMetric2 = Aggregation.buildAggregation("sum","metric2","sumMetric2");
Aggregation groupByDim1 = Aggregation.buildAggregation("groupBy","dim1","groupByDim1");
Aggregation groupByDim2 = Aggregation.buildAggregation("groupBy","dim2","groupByDim2");
groupByDim1.subAggs(groupByDim2,sumMetric1,sumMetric2);
groupByDim2.subAggs(sumMetric1,sumMetric2);

通用的解析方法还要依赖于聚合树,我的思路如下:

public void parseEsResult(Aggregation agg,Aggregations esOriginAggs,JSONObject result){
    if(isLeaf(agg.getAggType())){
        //叶节点的解析方式
        switch(agg.getAggType){
           case "sum":
              ParsedSum parsedSum = esOriginAggs.get(agg.getAlias());
              result.put(agg.getAlias(),parsedSum.getValue());
            break;
           case "count":
              ParsedValueCount parsedValueCount = esOriginAggs.get(agg.getAlias());
              result.put(parsedValueCount.getValue())
           case "max":
              ParsedMax parsedMax = esOriginAggs.get(agg.getAlias());
              result.put(agg.getAlias(),parseMax.getValue());
           ....//还有多种指标聚合方式,只写几个常用的,其他可以自己发挥
        }
    }else{
        //非叶节点的解析方式
        switch(agg.getAggType()){
           case "groupBy":
              //注意es中使用Terms来实现group by
              ParsedTerms parsedTerms = esOriginAggs.get(agg.getAlias());
              for(Bucket termsBucket:parsedTerms.getBuckets()){
                  JSONObject subResult = new JSONObject();
                  for(Aggregation subAgg:agg.getSubAggs()){
                     parseEsResult(subAgg,termsBucket.getAggregations(),subResult);
                  }
                  result.put(termsBucket.getKey(),subResult)
              }
           break;//还有多种解析方式,只写个常用的,其他可以自己发挥
        }
    }
}

这样的解析方式,假设dim1由d11,d12组成,dim2由d21,d22组成,那么上述通用代码的解析的结果如下,这样一套通用的代码可以防止重复实现解析es结果的代码,造成代码冗余。

{
   "d11":{
      "d21":{
         "sumMetric1":1234,
         "sumMetric2":3456
      },
      "d22":{
         "sumMetric1":345,
         "sumMetric2":456
      },
      "sumMetric1":678,
      "sumMetric2":7890
    },
   "d12":{
      "d21":{
         "sumMetric1":1234,
         "sumMetric2":3456
      },
      "d22":{
         "sumMetric1":345,
         "sumMetric2":456
      },
      "sumMetric1":678,
      "sumMetric2":7890
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-07-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Druid实践
  • ES实践
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档