前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Logstash实现数据处理

Logstash实现数据处理

原创
作者头像
沈小翊
修改2023-11-17 14:30:18
2320
修改2023-11-17 14:30:18
举报
文章被收录于专栏:大数据生态大数据生态

Logstash 是一款强大的数据传输工具,支持丰富多样的数据输入源与数据输出端,并且可以在管道中进行数据处理。Logstash的一条完整的数据传输链路就是一个管道,Logstash支持多个管道的自定义配置和并行。

下面是Demo演示:

Demo1

  • 仅传输符合Query的数据到目标索引

Demo2

  • 符合通配符匹配规则的多个索引数据输出到同一个目标索引中

Demo3

  • 将源端索引的不同type的数据分别输出到不同索引中,此场景通常用于ES 6版本集群索引迁移到ES 7集群

Demo4

  • 将源端索引的单个字段数据拆分到多个字段

Demo5

  • 如何实现ES中不同索引间的join操作

Query过滤

代码语言:javascript
复制
input {
  elasticsearch {
    hosts => ["10.0.xx.xx:9200"]
    user => "elastic"
    password => "passwd"
    index => "test"
    query => '{
  "query": {
    "range": {
      "@timestamp": {
        "gte": "2023-09-26T07:20:48.124111Z",
        "lte": "2023-09-26T08:20:48.126163Z"
      }
    }
  }
}'
    docinfo => true
    size => 5000
    scroll => "5m"
  }
}
output {
    elasticsearch {
        hosts => ["10.0.xx.xx:9200"]
        user => "elastic"
        password => "passwd"
        index => "result"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
    }
}

通配符匹配

将匹配符合规则的源端索引输出到相同index

代码语言:javascript
复制
input {
  elasticsearch {
    hosts => ["10.0.xx.xx:9200"]
    user => "elastic"
    password => "passwd"
    index => "test1"
    docinfo => true
    size => 5000
    scroll => "5m"
  }
}
#输出所有tes开头的索引
output {
  if [@metadata][_index] =~ /^tes/ {
    elasticsearch {
        hosts => ["http://172.16.xx.xx:9200"]
        user => "elastic"
        password => "passwd"
        index => "result"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
    }
  }

}

将多Type索引的不同Type输出到不同索引中

代码语言:javascript
复制
input {
  elasticsearch {
    hosts => ["10.0.xx.xx:9200"]
    user => "elastic"
    password => "passwd"
    index => "test"
    docinfo => true
    size => 5000
    scroll => "5m"
  }
}


output {
        if [@metadata][_type] == "type1" {
            elasticsearch {
                hosts => ["http://10.0.xx.xx:9200"]
                user => "elastic"
                password => "passwd"
                index => "type1"
                document_type => "_doc"
                document_id => "%{[@metadata][_id]}"
                }
        }else if [@metadata][_type] == "type2"{
                elasticsearch {
                hosts => ["http://10.0.xx.xx:9200"]
                user => "elastic"
                password => "passwd"
                index => "type2"
                document_type => "_doc"
                document_id => "%{[@metadata][_id]}"
                }
        }
}

索引单个字段数据拆分为多字段

某个索引下面的字段 c : abc_123 想拆成两个字段 c1 :abc c2: 123

代码语言:javascript
复制
input {
  elasticsearch {
    hosts => ["10.0.xx.xx:9200"]
    user => "elastic"
    password => "passwd"
    index => "test1"
    docinfo => true
    size => 5000
    scroll => "5m"
  }
}

filter {
    mutate {
        split => ["c","_"]
    }

    if [c][0] {
        mutate {                
            add_field =>   {
                "c1" => "%{[c][0]}"
            }
        }
    }
    
    if [c][1] {
        mutate {                
            add_field =>   {
                "c2" => "%{[c][1]}"
            }
        }
    }   
}


output {
    elasticsearch {
        hosts => ["http://10.0.xx.xx:9200"]
        user => "elastic"
        password => "passwd"
        index => "result"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
    }
}

join

Logstash中无法实现多个索引间的聚合操作,ES中可以通过父子文档关系达到聚合效果

下面语句在ES上实现聚合操作(需要拆分其他多个索引数据到同一索引中)

1. 创建索引

关联字段为field3,field1为索引1独有数据,field2为索引2独有数据

field3作为父文档,关联子文档field1和field2

代码语言:javascript
复制
PUT my_index
{
  "mappings": {
    "properties": {
      "my_join_field": {
        "type": "join",
        "relations": {
          "field3": ["field1", "field2"]
        }
      }
    }
  }
}

2. 插入数据

指定插入的field2,field3父文档field3 id为1达到关联效果

代码语言:javascript
复制
PUT my_index/_doc/1
{
  "my_join_field":"field3"
}

PUT my_index/_doc/2?routing=1
{
  "my_join_field": {
    "name": "field2",
    "parent": "1"
  }
}

PUT my_index/_doc/3?routing=1
{
  "my_join_field": {
    "name": "field1",
    "parent": "1"
  }
}

3. 查询数据

对父文档进行查询即可找到所有field3相同的数据

代码语言:javascript
复制
GET my_index/_search
{
  "query": {
    "has_parent": {
      "parent_type": "field3", 
      "query": {
        "match": {
          "_id": "1"
        }
      }
    }
  }
}

详细使用可参考Parent Id Query | Elasticsearch Guide [6.8] | Elastic

其他logstash使用见Elasticsearch filter plugin | Logstash Reference [6.8] | Elastic

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Query过滤
  • 通配符匹配
  • 将多Type索引的不同Type输出到不同索引中
  • 索引单个字段数据拆分为多字段
  • join
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档