使用Filebeat和Logstash集中归档日志

方 案

  • Filebeat->Logstash->Files
  • Filebeat->Redis->Logstash->Files
  • Nxlog(Rsyslog、Logstash)->Kafka->Flink(Logstash->ES-Kibana)
  • 其他方案(可根据自己需求,选择合适的架构,作者选择了第二种方案)

注释: 由于Logstash无法处理输出到文件乱序的问题,可通过不同的文件使用不同的Logstash;或者直接写入ES(不存在乱序问题)、通过Flink输出到文件

部 署

系统环境
  • Debian8 x64
  • logstash-6.1.1
  • filebeat-6.1.1-amd64
  • Redis-3.2
Filebeat配置
/etc/filebeat/filebeat.yml
 
filebeat.prospectors:
- type: log
paths:
- /home/data/log/*
- /home/data/*.log
scan_frequency: 20s
encoding: utf-8
tail_files: true
harvester_buffer_size: 5485760
fields:
ip_address: 192.168.2.2
env: qa
output.redis:
hosts: ["192.168.1.1:6379"]
password: "geekwolf"
key: "filebeat"
db: 0
timeout: 5
max_retires: 3
worker: 2
bulk_max_size: 4096
Logstash配置
input {
#Filebeat
# beats {
# port => 5044
# }
#Redis
redis {
batch_count => 4096
data_type => "list"
key => "filebeat"
host => "127.0.0.1"
port => 5044
password => "geekwolf"
db => 0
threads => 2
}
}
filter {
ruby {
code => 'event.set("filename",event.get("source").split("/")[-1])'
}
}
output {
 
if [filename] =~ "nohup" {
file {
path => "/data/logs/%{[fields][env]}/%{+YYYY-MM-dd}/%{[fields][ip_address]}/%{filename}"
flush_interval => 3
codec => line { format => "%{message}"}
}
} else {
file {
path => "/data/logs/%{[fields][env]}/%{+YYYY-MM-dd}/%{[fields][ip_address]}/logs/%{filename}"
flush_interval => 3
codec => line { format => "%{message}"}
}
}
#stdout { codec => rubydebug }
 
}

生产日志目录

├── prod
│   └── 2018-01-13
│   └── 2.2.2.2
│   ├── logs
│   │   ├── rpg_slow_db_.27075
│   └── nohup_service.log
└── qa
├── 2018-01-12
│   ├── 192.168.3.1
└── 2018-01-13
├── 192.168.3.2

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

扫码关注云+社区