前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elasticsearch探索:Pipeline API

Elasticsearch探索:Pipeline API

作者头像
HLee
修改2021-10-13 17:52:06
1K0
修改2021-10-13 17:52:06
举报
文章被收录于专栏:房东的猫房东的猫

简介

官方地址:https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html

在 Elasticsearch 5.0 之前,如果我们想在将文档索引到 Elasticsearch 之前预处理文档,那么唯一的方法是使用 Logstash 或以编程方式/手动预处理它们,然后将它们索引到 Elasticsearch。 Elasticsearch 缺乏预处理/转换文档的能力,它只是按原样索引文档。 但是,在 Elasticsearch 5.x 之后引入一个名为 ingest node 的功能,为 Elasticsearch 本身的文档预处理和丰富之前提供了一个轻量级的解决方案。

当我们的数据进入到 Elastic 集群中,并指定需要用到的 Pipeline,那么 Elasticsearch 中的 ingest node 将会帮我们安装规定的 processor 顺序来执行对数据的操作和处理。这在某种程度上方便了我们许多对集群的部署。如果我们单独部署一个 Logstash 有时没有那么多的灵活性。我们可以通过编程的方式随时修改这个 pipeline。

如果使用默认配置实现 Elasticsearch 节点,则默认情况下将启用 master,data 和 ingest(即,它将充当主节点,数据节点和提取节点)。 要在节点上禁用 ingest,请在 elasticsearch.yml 文件中配置以下设置:

代码语言:javascript
复制
node.ingest: false

ingest 节点可用于在对文档执行实际索引之前预处理文档。 此预处理通过截取批量和索引请求的提取节点执行,它将转换应用于数据,然后将文档传递回索引或批量 API。 随着新的提取功能的发布,Elasticsearch 已经取出了 Logstash 的过滤器部分,以便我们可以在 Elasticsearch 中处理原始日志。

要在索引之前预处理文档,我们必须定义pipeline(其中包含称为处理器的步骤序列,用于转换传入文档)。 要使用 pipeline,我们只需在索引或批量请求上指定 pipeline 参数,以告诉提取节点使用哪个 pipeline:

代码语言:javascript
复制
POST my_index/my_type?pipeline=my_pipeline_id 
{
  "key": "value"
}

定义 pipeline

pipeline 定义了一系列处理器。 每个处理器以某种方式转换文档。 每个处理器按照在 pipeline 中定义的顺序执行。 pipeline 由两个主要字段组成:description 和 processor 列表。

  • description 参数是一个非必需字段,用于存储一些描述/管道的用法;
  • processor 参数可以列出处理器以转换文档
代码语言:javascript
复制
{
  "description": "...",
  "processors": [ ...]
}

ingest 节点有大约20个内置 processor,包括 gsub,grok,转换,删除,重命名等。 这些可以在构建管道时使用。 除了内置processor 外,还可以使用提取附件(如 ingest attachment,ingetst geo-ip 和 ingest user-agent)等提取插件,并可在构建 pipeline 时使用。 这些插件在默认情况下不可用,可以像任何其他 Elasticsearch 插件一样进行安装。

Pipeline 以 cluster 状态存储,并且立即传播到所有 ingest node。 当 ingest node 接收到新 pipeline 时,它们将以内存 pipeline 表示形式更新其节点,并且 pipeline 更改将立即生效。

Ingest APIs

 ingest 节点提供一组称为 ingest API 的 API,可用于定义,模拟,删除或查找有关 pipeline 的信息。 摄取 API 端点是 _ingest。

Put pipeline API

此 API 用于定义新 pipeline。 此 API 还用于添加新 pipeline 或更新现有 pipeline。

我们来看一个例子吧。 如下面的代码所示,我们定义了一个名为 firstpipeline 的新 pipeline,它将消息字段中的值转换为大写

代码语言:javascript
复制
PUT _ingest/pipeline/firstpipeline
{
  "description": "uppsercase the incoming vlaue in the message filed",
  "processors": [
    {
      "uppercase": {
        "field": "message"
      }
    }
  ]
}

Response:
{
  "acknowledged" : true
}
代码语言:javascript
复制
PUT my_pipeline/_doc/1?pipeline=firstpipeline
{
  "name": "pipeline",
  "message": "this is so cool!"
}

GET my_pipeline/_doc/1

Response:
{
  "_index" : "my_pipeline",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "name" : "pipeline",
    "message" : "THIS IS SO COOL!"
  }
}
我们可以看到我们的 message 已经都变成大写的了。

创建管道时,可以定义多个处理器,执行顺序取决于定义中定义的顺序。 让我们看一个这样的例子。 如下面的代码所示,我们创建了一个名为 secondpipeline 的新管道,它转换 “message” 字段中存在的大写值,并将 “message” 字段重命名为 “data”。 它创建一个名为 “label” 的新字段,其值为 testlabel:

代码语言:javascript
复制
PUT _ingest/pipeline/secondpipeline
{
  "description": "uppercase the incoming value in the message field",
  "processors": [
    {
      "uppercase": {
        "field": "message",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "message",
        "target_field": "data",
        "ignore_failure": true
      }
    },
    {
      "set": {
        "field": "label",
        "value": "testlabel",
        "override": false
      }
    }
  ]
}
代码语言:javascript
复制
PUT my_pipeline/_doc/1?pipeline=secondpipeline
{
  "name": "pipeline",
  "message": "this is so cool!"
}

GET my_pipeline/_doc/1

Response:
{
  "_index" : "my_pipeline",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "data" : "THIS IS SO COOL!",
    "name" : "pipeline",
    "label" : "testlabel"
  }
}

通过上面的例子,我们可以看到我们之前的 message 项不见了,取而代之的是 data,同时它里面的字符都变成大写的了。另外,它也新增加了一个叫做 label 的项,并且它的值被设置为 testlabel。

提示:如果缺少处理器中使用的字段,则处理器将抛出异常,并且不会对文档编制索引。 为了防止处理器抛出异常,我们可以利用  “ignore_failure”:true 参数。

Get pipeline API

此 API 用于检索现有 pipeline 的定义。 使用此 API,可以找到单个 pipeline 定义的详细信息或查找所有 pipeline 的定义。

查找所有 pipeline 定义的命令是:

代码语言:javascript
复制
GET _ingest/pipeline

{
    "firstpipeline":{
        "description":"uppsercase the incoming vlaue in the message filed",
        "processors":[
            {
                "uppercase":{
                    "field":"message"
                }
            }
        ]
    },
    "secondpipeline":{
        "description":"uppercase the incoming value in the message field",
        "processors":[
            {
                "uppercase":{
                    "field":"message",
                    "ignore_failure":true
                }
            },
            {
                "rename":{
                    "field":"message",
                    "target_field":"data",
                    "ignore_failure":true
                }
            },
            {
                "set":{
                    "field":"label",
                    "value":"testlabel",
                    "override":false
                }
            }
        ]
    }
}

要查找现有 pipeline 的定义,请将管道 ID 传递给 get 管道 api。 以下是查找名为 secondpipeline 的 pipeline 定义的示例:

代码语言:javascript
复制
GET _ingest/pipeline/secondpipeline

也可以使用 filter_path 来获取 pipeline 的部分内容,比如:

代码语言:javascript
复制
GET _ingest/pipeline/secondpipeline?filter_path=*.processors.uppercase

Response:
{
  "secondpipeline" : {
    "processors" : [
      {
        "uppercase" : {
          "field" : "message",
          "ignore_failure" : true
        }
      }
    ]
  }
}
返回 processors 中的 uppercase 内容。

Del pipeline API

删除管道 API 按 ID 或通配符匹配删除 pipeline。 以下是删除名为 firstpipeline 的 pipeline 的示例:

代码语言:javascript
复制
DELETE _ingest/pipeline/firstpipeline

这样 firstpipeline 就被删除了,再次执行获取firstpipeline时,返回为空。

由于 pipeline 是群集级存储而被保存在每个节点的内存中,并且 pipeline 始终在 ingest node中运行,因此最好在群集中保留需要的 pipeline,而删除那些不需要的 pipeline。

模拟 pipeline API

此 pipeline API 可用于根据请求正文中提供的文档集模拟 pipeline 的执行。 可以指定要对提供的文档执行的现有 pipeline,或者在请求的主体中提供 pipeline 定义。 要模拟 ingest pipeline,请将 “_simulate” 端点添加到 pipeline API。

以下是模拟现有 pipeline 的示例:

代码语言:javascript
复制
POST _ingest/pipeline/secondpipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "name": "pipeline",
        "message": "this is so cool!"
      }
    },
    {
      "_source": {
        "name": "nice",
        "message": "this is nice!"
      }
    }
  ]
}

Response:
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_type",
        "_id" : "_id",
        "_source" : {
          "name" : "pipeline",
          "label" : "testlabel",
          "data" : "THIS IS SO COOL!"
        },
        "_ingest" : {
          "timestamp" : "2020-12-29T07:29:38.555Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_type",
        "_id" : "_id",
        "_source" : {
          "name" : "nice",
          "label" : "testlabel",
          "data" : "THIS IS NICE!"
        },
        "_ingest" : {
          "timestamp" : "2020-12-29T07:29:38.555Z"
        }
      }
    }
  ]
}

引用 pipeline

根据 Elastic 的官方文档 https://www.elastic.co/guide/en/elasticsearch/reference/current/pipeline-processor.html,你甚至可以在 pipeline processor 中引用已有的 pipeline。比如:

代码语言:javascript
复制
PUT _ingest/pipeline/pipelineA
{
  "description": "inner pipeline",
  "processors": [
    {
      "set": {
        "field": "inner_pipeline_set",
        "value": "inner"
      }
    }
  ]
}

我们先定义叫做 pipelineA 的 pipeline。然后,我们在定义另外一个 pipeline:

代码语言:javascript
复制
PUT _ingest/pipeline/pipelineB
{
  "description": "outer pipeline",
  "processors": [
    {
      "pipeline": {
        "name": "pipelineA"
      }
    },
    {
      "set": {
        "field": "outer_pipeline_set",
        "value": "outer"
      }
    }
  ]
}

在这里 pipelineB 运用 pipelineA来形成自己的 pipeline。在 pipelineB 执行的时候,它先执行 pipelineA,然后向下执行。

Pipeline 应用场景

1) 我们可以在 _bulk API 中进行使用:

代码语言:javascript
复制
POST _bulk
{"index": {"_index": "my_index", "_id" : "1", "pipeline": "my_pipeline"}}
{"name": "zhang san", "category": "sports"}

Bulk API:
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }

2) 我们可以直接在 beats 中进行使用。具体可参阅文章 “Beats: Filebeat和pipeline processors” 及文章 “Beats:在 Beats 中实现动态 pipeline

3) 你也可以直接在 reindex 时使用,比如:

代码语言:javascript
复制
POST _reindex
{
  "source": {
    "index": "source"
  },
  "dest": {
    "index": "dest",
    "pipeline": "some_ingest_pipeline"
  }
}

4) 在 enrich processors 中使用。请参阅文章 “Elasticsearch:enrich processor (7.5发行版新功能)” 及文章 “如何使用 Elasticsearch ingest 节点来丰富日志和指标”。

5) 在 update_by_query API 中使用:

代码语言:javascript
复制
POST my_index/update_by_query?pipeline=my_pipeline

具体可以参阅我的文章 “Beats:运用Elastic Stack分析COVID-19数据并进行可视化分析”。

6)  可以在索引中设置 default_pipeline:

代码语言:javascript
复制
PUT my_pipeline
{
  "settings": {
    "default_pipeline": "my_pipeline"
  }
}

详细可以阅读文章 “Elasticsearch:Elastic可观测性 - 运用 pipeline 使数据结构化”。

内置 Processors

默认情况下,Elasticsearch 提供大量的ingest处理器。 我们可以在地址https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html 找到已经为我设计好的内置的 processors。下面是一些常见的一些 processor 的列表:

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 定义 pipeline
  • Ingest APIs
    • Put pipeline API
      • Get pipeline API
        • Del pipeline API
          • 模拟 pipeline API
            • 引用 pipeline
            • Pipeline 应用场景
            • 内置 Processors
            相关产品与服务
            Elasticsearch Service
            腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档