前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elasticsearch中将Doc根据A字段排序获得第一个Doc的B字段值的方法

Elasticsearch中将Doc根据A字段排序获得第一个Doc的B字段值的方法

作者头像
颇忒脱
发布2018-10-19 14:43:06
1K0
发布2018-10-19 14:43:06
举报

注:本文基于Elasticsearch 6.1.2编写

最近遇到这样一个需求,要通过Elasticsearch将Doc根据A字段降序,然后获得B字段的值,最终根据B字段的值再去做Pipeline Aggregation

先尝试了Max Aggregation,但是Max Aggregation只能获得A字段的最大值。

然后尝试了Top Hits Aggregation,但是Top Hits Aggregation的结果无法被Pipeline Aggregation使用。

最终尝试Scripted Metric Aggregation成功。下面举例说明

比如现在我们有一堆股票价格数据,我们现在需要获得股票每天的收盘价比前一天的差值(Delta)。下面先倒入一段股票数据,date字段代表时间戳,price字段代表当时的价格:

代码语言:javascript
复制
POST /_bulk

{"index":{"_index":"stock-price","_type":"data"}}
{"date":"2018-01-01T10:00:00","price":10}
{"index":{"_index":"stock-price","_type":"data"}}
{"date":"2018-01-01T10:30:00","price":15}
{"index":{"_index":"stock-price","_type":"data"}}
{"date":"2018-01-02T10:00:00","price":20}
{"index":{"_index":"stock-price","_type":"data"}}
{"date":"2018-01-02T10:30:00","price":19}
{"index":{"_index":"stock-price","_type":"data"}}
{"date":"2018-01-03T10:00:00","price":30}
{"index":{"_index":"stock-price","_type":"data"}}
{"date":"2018-01-03T10:30:00","price":35}
{"index":{"_index":"stock-price","_type":"data"}}
{"date":"2018-01-04T10:00:00","price":40}
{"index":{"_index":"stock-price","_type":"data"}}
{"date":"2018-01-04T10:30:00","price":20}
{"index":{"_index":"stock-price","_type":"data"}}
{"date":"2018-01-05T10:00:00","price":10}

先分解一下看这个查询如何实现:

  1. 把股票数据按照“天”分bucket,这个会用到Date Histogram Aggregation
  2. 获得每个bucket里的最后一次的价格数据,这个会用到Scripted Metric Aggregation
  3. 最后根据算每个bucket的差值,这个会用到Serial Differencing Aggregation

下面是查询代码:

代码语言:javascript
复制
GET /stock-price/_search

{
  "size": 0,
  "aggs": {
    "minute_histo": {
      "date_histogram": {
        "field": "date",
        "interval": "day"
      },
      "aggs": {
        "latest_price": {
          "scripted_metric": {
            "init_script": "params._agg.tmp_rs = ['latest_date': 0, 'latest_price' : -1];",
            "map_script": "def tmp_rs = params._agg.tmp_rs; boolean newer = doc['date'].value.millis > tmp_rs['latest_date']; if (newer) { tmp_rs['latest_date'] = doc['date'].value.millis; tmp_rs['latest_price'] = doc.price.value; }",
            "combine_script": "return params._agg.tmp_rs;",
            "reduce_script": "long rs_date = 0; long rs_price = -1; for (a in params._aggs) {  if (a == null) { continue; } boolean newer = a['latest_date'] > rs_date;   if (newer) {     rs_date = a['latest_date'];     rs_price = a['latest_price'];   } } return rs_price;"
          }
        },
        "delta_price": {
          "serial_diff": {
            "buckets_path": "latest_price.value",
            "lag": 1
          }
        }
      }
    }
  }
}

最后得到的结果是:

代码语言:javascript
复制
{
  ...
  "aggregations": {
    "minute_histo": {
      "buckets": [
        {
          "key_as_string": "2018-01-01T00:00:00.000Z",
          "key": 1514764800000,
          "doc_count": 2,
          "latest_price": {
            "value": 15
          }
        },
        {
          "key_as_string": "2018-01-02T00:00:00.000Z",
          "key": 1514851200000,
          "doc_count": 2,
          "latest_price": {
            "value": 19
          },
          "delta_price": {
            "value": 4.0
          }
        },
        {
          "key_as_string": "2018-01-03T00:00:00.000Z",
          "key": 1514937600000,
          "doc_count": 2,
          "latest_price": {
            "value": 35
          },
          "delta_price": {
            "value": 16.0
          }
        },
        {
          "key_as_string": "2018-01-04T00:00:00.000Z",
          "key": 1515024000000,
          "doc_count": 2,
          "latest_price": {
            "value": 20
          },
          "delta_price": {
            "value": -15.0
          }
        },
        {
          "key_as_string": "2018-01-05T00:00:00.000Z",
          "key": 1515110400000,
          "doc_count": 1,
          "latest_price": {
            "value": 10
          },
          "delta_price": {
            "value": -10.0
          }
        }
      ]
    }
  }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档