前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >《Learning ELK Stack》3 使用Logstash采集、解析和转换数据

《Learning ELK Stack》3 使用Logstash采集、解析和转换数据

作者头像
yeedomliu
发布2020-07-02 11:31:32
1.6K0
发布2020-07-02 11:31:32
举报
文章被收录于专栏:yeedomliu

3 使用Logstash采集、解析和转换数据

  • 理解Logstash如何采集、解析并将各种格式和类型的数据转换成通用格式,然后被用来为不同的应用构建多样的分析系统

配置Logstash

  • 输入插件将源头数据转换成通用格式的事件,过滤插件修改这些事件,最终输出插件将它们输出到其他系统

Logstash插件

列出Logstash的所有插件

代码语言:javascript
复制
bin/plugin list
  • 使用下面命令列出指定分组的插件
代码语言:javascript
复制
bin/plugin list --group <group_name>
bin/plugin list --group output

插件属性的数据类型

数组(Array
代码语言:javascript
复制
path => ["value1", "value2"]
布尔值(Boolean
代码语言:javascript
复制
periodic_flush => false
编解码器(Codec
  • 编解码器实际上并不是一种数据类型,它是在输入或输出的时候对数据进行解码或编码的一种方式。上面例子指定在输出时,编解码器会将所有输出数据编码成json格式
代码语言:javascript
复制
codec => "json"
哈希(Hash
  • 由一系列键值对组成的集合
代码语言:javascript
复制
match => {
    "key1" => "value1", "key2" => "value2"
}
字符串(String
代码语言:javascript
复制
value => "welcome"
注释(Comment
  • 以字符#开头
代码语言:javascript
复制
# 这是一个注释
字段引用
  • 可使用[field_name]的方式引用,嵌套字段可以使用[level1][level2]的方式指定

Logstash条件语句

  • 在某些条件下Logstash可以用条件语句来过滤事件或日志记录。Logstash中的条件处理和其他编程语言中的类似,使用ifif elseelse语句。多个if else语句块可以嵌套
代码语言:javascript
复制
if <conditional expression1> {
# 一些处理语句
}
else if <conditional expression2> {
# 一些处理语句
}
else {
# 一些其他语句
}
  • 条件语句可以与比较运算符、逻辑运算符和单目运算符一起使用
  • 比较运算符包括以下几种
  1. 相等运算符:==、!=、<、>、<=、>=
  2. 正则表达式:=~、!~
  3. 包含:innot in
  4. 逻辑运算符:andornandxor
  5. 单目运算符:!
代码语言:javascript
复制
filter {
  if [action] == "login" {
    mutate { remove => "password" }
  }
}
代码语言:javascript
复制
output {
    if [loglevel] == "ERROR" and [deployment] == "production" {
        email {}
    }
}

Logstash插件的类型

  1. 输入(Input
  2. 过滤器(Filter
  3. 输出(Output
  4. 编解码(Codec
输入插件
文件(file

Logstash文件输入插件将文件读取的最新位点保存在$HOME/.sincdb*的文件中。文件路径和刷新频率可以通过sincedb_path和sincdb_write_interval配置

代码语言:javascript
复制
input {
    file {
        path => "/GOOG.csv"
        add_field => {"input_time" => "%{@timestamp}"}
        codec => "json"
        delimiter => "\n"
        exclude => "*.gz"
        sincedb_path => "$HOME/.sincedb*"
        sincedb_write_interval => 15
        start_position => "end"
        tags => ["login"]
        type => ["apache"]
    }
}

选项

数据类型

是否必选

默认值

说明

add_field

hash

{}

增加字段

codec

string

plain

用于指定编解码器输入

delimiter

string

`n `

分隔符

exclude

array

[]

排除指定类型文件

sincedb_path

string

$HOME/.sincedb

监视文件当前读取位点

sincedb_write_interval

int

15

指定sincedb文件写入频率

start_position

string

end

输入文件的初始读取位点

tags

array

给输入事件增加一系列标签

type

string

给多个输入路径中配置的不同类型的事件指定type名称

path

array

日志文件路径

代码语言:javascript
复制
input {
    file {
        path => ["/var/log/syslog/*"]
        type => "syslog"
    }
    file {
        path => ["/var/log/apache/*"]
        type => "apache"
    }
}
filter {
    if [type] == "syslog" {
        grok{}
    }
    if [type] == "apache" {
        grok{}
    }
    if "login" == tags[] {}
}
Redis
  • redis实例中读取事件和日志。经常用于输入数据的消息代理,将输入数据缓存到队列,等待索引器读取日志

选项

数据类型

是否必选

默认值

说明

add_field

hash

{}

增加字段

codec

string

plain

用于指定编解码器输入

data_type

string

list

list(BLPOP)、channel(SUBSCRIBE命令订阅key)、pattern_channel(PSUBSCRIBE命令订阅key)

host

string

127.0.0.1

key

string

password

string

port

int

6379

文档链接:https://www.elastic.co/guide/en/logstash/current/input-plugins.html

输出
elasticsearch
  • 最重要的输出插件
过滤器
  • 用于在输出插件输出结果之前,对输入插件中读取的事件进行中间处理。常用于识别输入事件的字段,并对输入事件的部分内容进行条件判断处理
csv
  • 用于将csv文件输入的数据进行解析,并将值赋给字段
代码语言:javascript
复制
csv {
    columns => ["date_of_record","open","high","low","close","volume","adj_close"]
    separator => ","
}
date
  • 给事件赋予正确的时间戳非常重要,只有这样才能在Kibana中使用时间过滤器对事件进行分析
代码语言:javascript
复制
date {
    match => ["date_of_record", "yyyy-MM-dd"]
}
drop
  • 将满足条件的所有事件都丢弃掉,这个过滤插件有下面这些配置选项
  1. add_field
  2. add_tag
  3. remove_field
  4. remove_tag
代码语言:javascript
复制
filter {
    if [fieldname == "test"] {
        drop {}
    }
}
geoip
  • 基于输入事件中的IP地址给事件增加地理位置信息。这些信息从Maxmind数据库中读取

Maxmind是一个专门提供IP地址信息产品的公司。GeoIP是它们开发的智能IP产品,用于IP地址的位置跟踪。所有Logstash版本都自带一个Maxmind的GeoLite城市数据库。这个地址数据库可以从https://dev.maxmind.com/geoip/geoip2/geolite2/获取

代码语言:javascript
复制
geoip {
    source => # 必选字符串,需要使用geoip服务进行映射的ip地址或主机名
}
grok
  • 目前为止最流行、最强大的插件。使用它可以解析任何非结构化的日志事件,并将日志转化成一系列结构化的字段,用于后续的日志处理和分析
  • 可以用于解析任何类型的日志,包括apachemysql、自定义应用日志或者任何事件中非结构化的文本
  • Logstash默认包含了很多grok模式,可以直接用来识别特定类型的字段,也支持自定义正则表达式
  • 所有可用grok模式从这里获取:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
代码语言:javascript
复制
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
  • 上面grok模式可以使用下面这样的操作符直接识别这些类型的字段。希望将日志事件中代表主机名的文本赋值给host_name这个字段
代码语言:javascript
复制
%{HOSTNAME:host_name}
  • 看一下如何用grok模式表示一行HTTP日志
代码语言:javascript
复制
54.3.245.1 GET /index.html 14562 0.056
  • grok模式是这样的
代码语言:javascript
复制
%{IP:client_ip} %{WORD:request_method} %{URIPATHPARAM:uri_path} %{NUMBER:bytes_transfered} %{NUMBER:duration}
代码语言:javascript
复制
filter {
    grok {
        match => { "message" => "%{IP:client_ip} %{WORD:request_method} %{URIPATHPARAM:uri_path} %{NUMBER:bytes_transfered} %{NUMBER:duration}" }
    }
}
  • 使用grok过滤器处理上面的事件后,可以看到事件中增加了如下字段和值
  1. client_ip:54.3.245.1
  2. request_methodGET
  3. uri_path:/index.html
  4. bytes_transferred:14562
  5. duration:0.056
  • 如果grok模式中没有需要的模式,可以使用正则表达式创建自定义模式

设计和测试grok模式 http://grokdebug.herokuapp.com/ http://grokconstructor.appspot.com/do/match?example=1

mutate
  • 对输入事件进行重命名、移除、替换和修改字段。也用于转换字段的数据类型、合并两个字段、将文本从小写转换为大写等
sleep
  • Logstash置于sleep模式,时间由参数指定,也可以基于事件指定sleep频率
  • 如果希望每处理五个事件就sleep一秒,可以这样配置
代码语言:javascript
复制
filter {
    sleep {
        time => "1"
        every => 5
    }
}
编解码
  • 用于对输入事件进行解码,对输出事件进行解码,以流式过滤器的形式在输入插件和输出插件中工作,重要的编解码插件包括
  1. avro
  2. json
  3. line
  4. multiline
  5. plain
  6. rubydebug
  7. spool

输入事件或输出事件是完整的json文档,可以这样配置(其中一种方式就可以)

代码语言:javascript
复制
input {
    stdin { codec => "json" }
    stdin { codec => json{} }
}

将每行输入日志作为一个事件,将每个输出事件解码成一行

代码语言:javascript
复制
input {
    stdin { codec => line{} }
    stdin { codec => "line" }
}

把多行日志作为一个事件处理

代码语言:javascript
复制
input {
    file {
        path => "/var/log/someapp.log"
        codec => multiline {
            pattern => "^%{TIMESTAMP_ISO8601}"
            negate => true
            what => previous
        }
    }
}

rubydebug在输出事件时使用,使用Ruby Awesome打印库打印输出事件

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-07-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 yeedomliu 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 3 使用Logstash采集、解析和转换数据
    • 配置Logstash
      • Logstash插件
        • 列出Logstash的所有插件
        • 插件属性的数据类型
        • Logstash条件语句
        • Logstash插件的类型
    相关产品与服务
    云数据库 Redis
    腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档