微信公众号:[中间件兴趣圈] 作者介绍:《RocketMQ技术内幕》作者;
本篇将开始介绍Elasticsearch Bucket聚合(桶聚合)。
Buket Aggregations(桶聚合)不像metrics Aggregations(度量聚合)那样计算字段上的度量,而是创建文档桶,每个文件桶有效地定义一个文档集。除了bucket本身之外,bucket聚合还计算并返回“落入”每个bucket的文档的数量。
与度量聚合相反,桶聚合可以嵌套子聚合。这些子聚合将为它们的“父”桶聚合创建的桶进行聚合。
ES Bucket Aggregations对标关系型数据库的(group by)。
首先我们来介绍桶聚合两个常用参数intervals、time_zone的含义。
定义桶的间隔,其可选值如下:
对于日期类型,可以使用time_zone来指定时区,可选值可以是相对ISO 8601 utc的相对值,例如+01:00或-08:00,也可以是时区ID,例如America/Los_Angeles。
直方图聚合,Date Histogram Aggregation是其特例。
动态将文档中的值按照特定的间隔构建桶,并计算落在该桶的数量,文档中的值根据如下函数进行近似匹配:
bucket_key = Math.floor((value - offset) / interval) * interval + offset, 其中interval必须是正小数(包含正整数),offset为[0,interval)。
主要支持的参数如下:
具体JAVA的示例将在Date Histogram Aggregation中详细介绍。
Date Histogram Aggregation。
1{
2 "aggs" : {
3 "sales_over_time" : {
4 "date_histogram" : {
5 "field" : "date",
6 "interval" : "month"
7 }
8 }
9 }
10}
对应的JAVA示例如下:
1/**
2 * 日期直方图聚合
3 */
4 public static void test_Date_Histogram_Aggregation() {
5 RestHighLevelClient client = EsClient.getClient();
6 try {
7
8 //构建日期直方图聚合 时间间隔,示例中按月统计
9 DateHistogramInterval interval = new DateHistogramInterval("1M");
10 SearchRequest searchRequest = new SearchRequest();
11 searchRequest.indices("aggregations_index02");
12 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
13 AggregationBuilder aggregationBuild = AggregationBuilders.dateHistogram("createTime_histogram")
14 .field("createTime")
15 .dateHistogramInterval(interval)
16 // .format("yyyy-MM-dd") // 对key的格式化
17 ;
18 sourceBuilder.aggregation(aggregationBuild);
19 sourceBuilder.size(0);
20 sourceBuilder.query(
21 QueryBuilders.termQuery("sellerId", 24)
22 );
23 searchRequest.source(sourceBuilder);
24 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
25 System.out.println(result);
26 } catch (Throwable e) {
27 e.printStackTrace();
28 } finally {
29 EsClient.close(client);
30 }
31 }
对应的返回值:
1{
2 ... //省略常规响应
3 "aggregations":{
4 "date_histogram#createTime_histogram":{
5 "buckets":[
6 "key_as_string":"2015-12-01 00:00:00",
7 "key":1448928000000,
8 "doc_count":6
9 },
10 {
11 "key_as_string":"2016-01-01 00:00:00",
12 "key":1451606400000,
13 "doc_count":4
14 }
15 ]
16 }
17 }
18}
其相应的参数已在上面详述,在此不重复介绍。
除Histogram Aggregation罗列的参数后,还额外支持如下参数:
1"aggregations":{
2 "date_histogram#createTime_histogram":{
3 "buckets":{
4 "2015-12-01 00:00:00":{
5 "key_as_string":"2015-12-01 00:00:00",
6 "key":1448928000000,
7 "doc_count":6
8 },
9 "2016-01-01 00:00:00":{
10 "key_as_string":"2016-01-01 00:00:00",
11 "key":1451606400000,
12 "doc_count":4
13 }
14 }
15 }
16 }
17}
Date Range Aggregation,每个范围定义[from,to),from,to可支持date mesh格式。 其使用示例如下,其他与 Date Histogram类似。
1/**
2 * 日期范围聚合
3 */
4 public static void test_Date_range_Aggregation() {
5 RestHighLevelClient client = EsClient.getClient();
6 try {
7 //构建日期直方图聚合 时间间隔,示例中按月统计
8 SearchRequest searchRequest = new SearchRequest();
9 searchRequest.indices("aggregations_index02");
10 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
11 AggregationBuilder aggregationBuild = AggregationBuilders.dateRange("createTime_date_range")
12 .field("createTime")
13 .format("yyyy-MM-dd")
14 .addRange("quarter_01", "2016-01", "2016-03")
15 .addRange("quarter_02", "2016-03", "2016-06")
16 .addRange("quarter_03", "2016-06", "2016-09")
17 .addRange("quarter_04", "2016-09", "2016-12")
18
19 // .format("yyyy-MM-dd") // 对key的格式化
20 ;
21 sourceBuilder.aggregation(aggregationBuild);
22 sourceBuilder.size(0);
23 sourceBuilder.query(
24 QueryBuilders.termQuery("sellerId", 24)
25 );
26 searchRequest.source(sourceBuilder);
27 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
28 System.out.println(result);
29 } catch (Throwable e) {
30 e.printStackTrace();
31 } finally {
32 EsClient.close(client);
33 }
34 }
聚合中支持首先根据过滤上下文对所有文档进行刷选,然后再进行聚合计算,例如:
1POST /sales/_search?size=0
2{
3 "aggs" : {
4 "t_shirts" : {
5 "filter" : { "term": { "type": "t-shirt" } },
6 "aggs" : {
7 "avg_price" : { "avg" : { "field" : "price" } }
8 }
9 }
10 }
11}
其对应的JAVA代码如下:
1/**
2 * 日期范围聚合
3 */
4 public static void test_filter_Aggregation() {
5 RestHighLevelClient client = EsClient.getClient();
6 try {
7 //构建日期直方图聚合 时间间隔,示例中按月统计
8 SearchRequest searchRequest = new SearchRequest();
9 searchRequest.indices("aggregations_index02");
10 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
11 AggregationBuilder aggregationBuild = AggregationBuilders.filter("t_shirts", QueryBuilders.termQuery("status", "1"))
12 .subAggregation(AggregationBuilders.avg("avg").field("num"))
13 ;
14 sourceBuilder.aggregation(aggregationBuild);
15 sourceBuilder.size(0);
16 sourceBuilder.query(
17 QueryBuilders.termQuery("sellerId", 24)
18 );
19 searchRequest.source(sourceBuilder);
20 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
21 System.out.println(result);
22 } catch (Throwable e) {
23 e.printStackTrace();
24 } finally {
25 EsClient.close(client);
26 }
27 }
其返回结果如下:
1{
2 ... //省略
3 "aggregations":{
4 "filter#t_shirts":{
5 "doc_count":2,
6 "avg#avg":{
7 "value":1
8 }
9 }
10 }
11}
{ … //省略 "aggregations":{ "filter#t_shirts":{ "doc_count":2, "avg#avg":{ "value":1 } } } }
定义一个多桶聚合,其中每个桶与一个过滤器相关联。每个bucket将收集与其关联过滤器匹配的所有文档。
1public static void test_filters_aggregation() {
2 RestHighLevelClient client = EsClient.getClient();
3 try {
4 //构建日期直方图聚合 时间间隔,示例中按月统计
5 SearchRequest searchRequest = new SearchRequest();
6 searchRequest.indices("aggregations_index02");
7 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
8 AggregationBuilder aggregationBuild = AggregationBuilders.filters("create_filters",
9 QueryBuilders.termQuery("status", 1),
10 QueryBuilders.termQuery("buyerId", 1))
11 .subAggregation(AggregationBuilders.avg("avg").field("num"))
12 ;
13 sourceBuilder.aggregation(aggregationBuild);
14 sourceBuilder.size(0);
15 sourceBuilder.query(
16 QueryBuilders.termQuery("sellerId", 24)
17 );
18 searchRequest.source(sourceBuilder);
19 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
20 System.out.println(result);
21 } catch (Throwable e) {
22 e.printStackTrace();
23 } finally {
24 EsClient.close(client);
25 }
26
27 }
其返回结果:
1{
2 ... // 省略
3 "aggregations":{
4 "filters#create_filters":{
5 "buckets":[
6 {
7 "doc_count":2,
8 "avg#avg":{
9 "value":1
10 }
11 },
12 {
13 "doc_count":0,
14 "avg#avg":{
15 "value":null
16 }
17 }
18 ]
19 }
20 }
21}
温馨提示,每一个filter代表一个桶(聚合)。
全局聚合,会忽略所有的查询条件,具体从下述例子进行说明:
1POST /sales/_search?size=0
2{
3 "query" : {
4 "match" : { "type" : "t-shirt" }
5 },
6 "aggs" : {
7 "all_products" : {
8 "global" : {},
9 "aggs" : {
10 "avg_price" : { "avg" : { "field" : "price" } }
11 }
12 },
13 "t_shirts": { "avg" : { "field" : "price" } }
14 }
15}
其聚合的文档集不是匹配该查询的文档"query" : {"match" : { "type" : "t-shirt" } },而是针对所有的文档进行聚合。
对应的JAVA实例如下:
1public static void test_global_aggregation() {
2 RestHighLevelClient client = EsClient.getClient();
3 try {
4 //构建日期直方图聚合 时间间隔,示例中按月统计
5 SearchRequest searchRequest = new SearchRequest();
6 searchRequest.indices("aggregations_index02");
7 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
8 AggregationBuilder aggregationBuild = AggregationBuilders.global("all_producers")
9 .subAggregation(AggregationBuilders
10 .avg("num_avg_aggregation")
11 .field("num"))
12 ;
13 sourceBuilder.aggregation(aggregationBuild);
14 sourceBuilder.size(0);
15 sourceBuilder.query(
16 QueryBuilders.termQuery("sellerId", 24)
17 );
18 searchRequest.source(sourceBuilder);
19 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
20 System.out.println(result);
21 } catch (Throwable e) {
22 e.printStackTrace();
23 } finally {
24 EsClient.close(client);
25 }
26
27 }
对应的返回值如下:
1{
2 "took":151,
3 "timed_out":false,
4 "_shards":{
5 "total":5,
6 "successful":5,
7 "skipped":0,
8 "failed":0
9 },
10 "hits":{
11 "total":39, // @1
12 "max_score":0,
13 "hits":[
14
15 ]
16 },
17 "aggregations":{
18 "global#all_producers":{
19 "doc_count":1286, // @2
20 "avg#num_avg_aggregation":{
21 "value":1.3157076205287714
22 }
23 }
24 }
25}
结果@1:表示符合查询条件的总个数。 结构@2:表示参与聚合的文档数量,等于当前库中文档总数。
ip类型特有的范围聚合,与其他聚合使用类似,就不重复介绍了。
统计缺少某个字段的文档个数。 JAVA示例如下:
1AggregationBuilder aggregationBuild = AggregationBuilders.missing("missing_num_count")
2 .field("num");
基于多桶值源的聚合,允许用户定义一组范围——每个范围表示一个桶。在聚合过程中,将根据每个bucket范围和相关/匹配文档的“bucket”检查从每个文档中提取的值。注意,此聚合包含from值,并排除每个范围的to值。
1GET /_search
2{
3 "aggs" : {
4 "price_ranges" : {
5 "range" : {
6 "field" : "price",
7 "ranges" : [
8 { "to" : 100.0 },
9 { "from" : 100.0, "to" : 200.0 },
10 { "from" : 200.0 }
11 ]
12 }
13 }
14 }
15}
对应的JAVA示例如下:
1public static void test_range_aggregation() {
2 RestHighLevelClient client = EsClient.getClient();
3 try {
4 //构建日期直方图聚合 时间间隔,示例中按月统计
5 SearchRequest searchRequest = new SearchRequest();
6 searchRequest.indices("aggregations_index02");
7 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
8 AggregationBuilder aggregationBuild = AggregationBuilders.range("num_range_aggregation")
9 .field("num")
10 .addRange(0, 5)
11 .addRange(5,10)
12 .addUnboundedFrom(10)
13 ;
14 sourceBuilder.aggregation(aggregationBuild);
15 sourceBuilder.size(0);
16 sourceBuilder.query(
17 QueryBuilders.termQuery("sellerId", 24)
18 );
19 searchRequest.source(sourceBuilder);
20 SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
21 System.out.println(result);
22 } catch (Throwable e) {
23 e.printStackTrace();
24 } finally {
25 EsClient.close(client);
26 }
27
28 }
其返回结果如下:
1{
2 // 省略
3 "aggregations":{
4 "range#num_range_aggregation":{
5 "buckets":[
6 {
7 "key":"0.0-5.0",
8 "from":0,
9 "to":5,
10 "doc_count":38
11 },
12 {
13 "key":"5.0-10.0",
14 "from":5,
15 "to":10,
16 "doc_count":0
17 },
18 {
19 "key":"10.0-*",
20 "from":10,
21 "doc_count":1
22 }
23 ]
24 }
25 }
26}
Range Aggregations支持嵌套聚合,使用subAggregations来支持嵌套聚合,根据官网示例如下:
1GET /_search
2{
3 "aggs" : {
4 "price_ranges" : {
5 "range" : { // @1
6 "field" : "price",
7 "ranges" : [
8 { "to" : 100 },
9 { "from" : 100, "to" : 200 },
10 { "from" : 200 }
11 ]
12 },
13 "aggs" : { // @2
14 "price_stats" : {
15 "stats" : { "field" : "price" }
16 }
17 }
18 }
19 }
20}
首先通过@1定义范围聚合,然后对每个桶中 的文档再执行子聚合@2,其返回结果如下:
1{
2 ...
3 "aggregations": {
4 "price_ranges": {
5 "buckets": [
6 {
7 "key": "*-100.0",
8 "to": 100.0,
9 "doc_count": 2,
10 "price_stats": {
11 "count": 2,
12 "min": 10.0,
13 "max": 50.0,
14 "avg": 30.0,
15 "sum": 60.0
16 }
17 },
18 {
19 "key": "100.0-200.0",
20 "from": 100.0,
21 "to": 200.0,
22 "doc_count": 2,
23 "price_stats": {
24 "count": 2,
25 "min": 150.0,
26 "max": 175.0,
27 "avg": 162.5,
28 "sum": 325.0
29 }
30 },
31 {
32 "key": "200.0-*",
33 "from": 200.0,
34 "doc_count": 3,
35 "price_stats": {
36 "count": 3,
37 "min": 200.0,
38 "max": 200.0,
39 "avg": 200.0,
40 "sum": 600.0
41 }
42 }
43 ]
44 }
45 }
46}
本文详细介绍了ES 桶聚合,并给出JAVA示例,下一篇将重点关注ES桶聚合之term聚合。