Logstash
采集、解析和转换数据Logstash
如何采集、解析并将各种格式和类型的数据转换成通用格式,然后被用来为不同的应用构建多样的分析系统Logstash
Logstash
插件Logstash
的所有插件bin/plugin list
bin/plugin list --group <group_name>
bin/plugin list --group output
Array
)path => ["value1", "value2"]
Boolean
)periodic_flush => false
Codec
)json
格式codec => "json"
Hash
)match => {
"key1" => "value1", "key2" => "value2"
}
String
)value => "welcome"
Comment
)# 这是一个注释
field_name
]的方式引用,嵌套字段可以使用[
level1][
level2]
的方式指定Logstash
条件语句Logstash
可以用条件语句来过滤事件或日志记录。Logstash
中的条件处理和其他编程语言中的类似,使用if
、if else
和else
语句。多个if else
语句块可以嵌套if <conditional expression1> {
# 一些处理语句
}
else if <conditional expression2> {
# 一些处理语句
}
else {
# 一些其他语句
}
in
、not in
and
、or
、nand
、xor
filter {
if [action] == "login" {
mutate { remove => "password" }
}
}
output {
if [loglevel] == "ERROR" and [deployment] == "production" {
email {}
}
}
Logstash
插件的类型Input
)Filter
)Output
)Codec
)file
)Logstash文件输入插件将文件读取的最新位点保存在$HOME/.sincdb*的文件中。文件路径和刷新频率可以通过sincedb_path和sincdb_write_interval配置
input {
file {
path => "/GOOG.csv"
add_field => {"input_time" => "%{@timestamp}"}
codec => "json"
delimiter => "\n"
exclude => "*.gz"
sincedb_path => "$HOME/.sincedb*"
sincedb_write_interval => 15
start_position => "end"
tags => ["login"]
type => ["apache"]
}
}
选项 | 数据类型 | 是否必选 | 默认值 | 说明 |
---|---|---|---|---|
add_field | hash | 否 | {} | 增加字段 |
codec | string | 否 | plain | 用于指定编解码器输入 |
delimiter | string | 否 | `n ` | 分隔符 |
exclude | array | 否 | [] | 排除指定类型文件 |
sincedb_path | string | 否 | $HOME/.sincedb | 监视文件当前读取位点 |
sincedb_write_interval | int | 否 | 15 | 指定sincedb文件写入频率 |
start_position | string | 否 | end | 输入文件的初始读取位点 |
tags | array | 否 | 给输入事件增加一系列标签 | |
type | string | 否 | 给多个输入路径中配置的不同类型的事件指定type名称 | |
path | array | 是 | 日志文件路径 |
input {
file {
path => ["/var/log/syslog/*"]
type => "syslog"
}
file {
path => ["/var/log/apache/*"]
type => "apache"
}
}
filter {
if [type] == "syslog" {
grok{}
}
if [type] == "apache" {
grok{}
}
if "login" == tags[] {}
}
Redis
redis
实例中读取事件和日志。经常用于输入数据的消息代理,将输入数据缓存到队列,等待索引器读取日志选项 | 数据类型 | 是否必选 | 默认值 | 说明 |
---|---|---|---|---|
add_field | hash | 否 | {} | 增加字段 |
codec | string | 否 | plain | 用于指定编解码器输入 |
data_type | string | 否 | list | list(BLPOP)、channel(SUBSCRIBE命令订阅key)、pattern_channel(PSUBSCRIBE命令订阅key) |
host | string | 否 | 127.0.0.1 | |
key | string | 否 | ||
password | string | 否 | ||
port | int | 否 | 6379 |
文档链接:https://www.elastic.co/guide/en/logstash/current/input-plugins.html
elasticsearch
csv
csv
文件输入的数据进行解析,并将值赋给字段csv {
columns => ["date_of_record","open","high","low","close","volume","adj_close"]
separator => ","
}
date
Kibana
中使用时间过滤器对事件进行分析date {
match => ["date_of_record", "yyyy-MM-dd"]
}
drop
add_field
add_tag
remove_field
remove_tag
filter {
if [fieldname == "test"] {
drop {}
}
}
geoip
IP
地址给事件增加地理位置信息。这些信息从Maxmind
数据库中读取Maxmind是一个专门提供IP地址信息产品的公司。GeoIP是它们开发的智能IP产品,用于IP地址的位置跟踪。所有Logstash版本都自带一个Maxmind的GeoLite城市数据库。这个地址数据库可以从https://dev.maxmind.com/geoip/geoip2/geolite2/获取
geoip {
source => # 必选字符串,需要使用geoip服务进行映射的ip地址或主机名
}
grok
apache
、mysql
、自定义应用日志或者任何事件中非结构化的文本Logstash
默认包含了很多grok
模式,可以直接用来识别特定类型的字段,也支持自定义正则表达式grok
模式从这里获取:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
grok
模式可以使用下面这样的操作符直接识别这些类型的字段。希望将日志事件中代表主机名的文本赋值给host_name
这个字段%{HOSTNAME:host_name}
grok
模式表示一行HTTP
日志54.3.245.1 GET /index.html 14562 0.056
grok
模式是这样的%{IP:client_ip} %{WORD:request_method} %{URIPATHPARAM:uri_path} %{NUMBER:bytes_transfered} %{NUMBER:duration}
filter {
grok {
match => { "message" => "%{IP:client_ip} %{WORD:request_method} %{URIPATHPARAM:uri_path} %{NUMBER:bytes_transfered} %{NUMBER:duration}" }
}
}
grok
过滤器处理上面的事件后,可以看到事件中增加了如下字段和值client_ip
:54.3.245.1request_method
:GET
uri_path
:/index.html
bytes_transferred
:14562duration
:0.056grok
模式中没有需要的模式,可以使用正则表达式创建自定义模式设计和测试grok模式 http://grokdebug.herokuapp.com/ http://grokconstructor.appspot.com/do/match?example=1
mutate
sleep
Logstash
置于sleep
模式,时间由参数指定,也可以基于事件指定sleep
频率sleep
一秒,可以这样配置filter {
sleep {
time => "1"
every => 5
}
}
avro
json
line
multiline
plain
rubydebug
spool
输入事件或输出事件是完整的json文档,可以这样配置(其中一种方式就可以)
input {
stdin { codec => "json" }
stdin { codec => json{} }
}
将每行输入日志作为一个事件,将每个输出事件解码成一行
input {
stdin { codec => line{} }
stdin { codec => "line" }
}
把多行日志作为一个事件处理
input {
file {
path => "/var/log/someapp.log"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => previous
}
}
}
rubydebug在输出事件时使用,使用Ruby Awesome打印库打印输出事件