ES环境已经就绪,我们需要先调通一类简单日志作为数据上报全链路的试运行。恰好Linux系统日志就是这样一个角色,随处可见,风险可控。
这里,我们选择了2种Linux系统日志:/var/log/messages 和 /var/log/cron。
按照《【ES私房菜】Filebeat安装部署及配置详解》在需要采集系统日志的系统上部署filebeat,然后编写如下配置:
vim filebeat.yml
############################# input #########################################
filebeat.prospectors:
- input_type: log
paths: /var/log/messages
paths: /var/log/cron
document_type: "messages"
spool_size: 1024
idle_timeout: "5s"
name: 172.16.x.xxx
############################# Kafka #########################################
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["x.x.x.1:9092","x.x.x.2:9092","x.x.x.3:9092"]
# message topic selection + partitioning
topic: '%{[type]}'
flush_interval: 1s
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
############################# Logging #########################################
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
path: /data/filebeat/logs
name: filebeat.log
keepfiles: 7
############################# console #########################################
#For Debug
output.console:
# Boolean flag to enable or disable the output module.
enabled: true
# Pretty print json event
pretty: true
这里我们先把 console 输出打开,然后启动filebeat观察输出:
{
"@timestamp": "2017-09-15T01:07:02.931Z",
"beat": {
"hostname": "TENCENT64.site",
"name": "172.16.x.xxx",
"version": "5.5.1"
},
"input_type": "log",
"message": "Sep 15 09:07:01 TENCENT64 CROND[815]: (root) CMD (/usr/local/sa/agent/secu-tcs-agent-mon-safe.sh \u003e /dev/null 2\u003e\u00261)",
"offset": 12991588,
"source": "/var/log/cron",
"type": "messages"
}
可以看到输出了如上的json格式数据,这些数据会原样上报kafka或logstash。
在数据上报ES之前,我们还需要先配置下相应的ES template。
当然,如果不想配置也可以,在后文的 logstash 的 output 模块中去掉【禁止管理模板的配置】即可,也就是让 logstash 自适应导入 template。
但是这样有个弊端,因为 logstash 是自适应匹配模板,可能有一些字段类型就不是那么准确,导致我们后面在Kibana里面就无法对一些字段进行聚合分析了。
根据messages字段内容,编写如下template:
{
"template": "messages-*",
"mappings": {
"log": {
"properties": {
"@timestamp": {
"include_in_all" : false,
"type" : "date"
},
"ip": {
"index": "not_analyzed",
"type": "ip"
},
"hostname": {
"index": "not_analyzed",
"type": "string"
},
"message": {
"type": "string"
},
"message_content": {
"type": "string"
},
"message_pid": {
"type": "string"
},
"message_program": {
"index": "not_analyzed",
"type": "string"
},
"message_timestramp": {
"include_in_all" : false,
"type" : "date"
}
}
}
}
}
Ps:这里就不详细说明字段的含义了,请参考系列文章《ElastiSearch template简介(整理中)》.
将上述模板保存为 messages.json 的文件,然后执行如下命令进行导入:
curl -XPUT http://x.x.x.x:9200/_template/template-messages -d @messages.json
正常将返回如下结果:
{
"acknowledged" : true
}
vim logstash.conf
input {
kafka {
# 指定kafka服务IP和端口(老版本是指定zookeeper服务,这里有所区别)
bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
# 取出 messages 数据
topics => "messages"
# 指定多个logstash为相同的 group_id,协同工作
group_id => "logstash"
# kafka 取出来的数据需要格式化为json
codec => json {
charset => "UTF-8"
}
# 加一个标签字段,用于后面判断
add_field => { "[@metadata][type]" => "messages" }
}
}
filter {
# 根据标签来处理messages数据
if [@metadata][type] == "messages" {
grok {
# 正则匹配
match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
# 亮点:由于源消息是没有上报IP的(只有主机名)这里做了点改造,将filebeat传来的name赋值给ip这个字段
add_field => [ "ip", "%{[beat][name]}" ]
# 如果成功匹配,则加一个匹配标签
add_tag => [ "matched" ]
}
# 如果标签不存在则丢弃这条消息
if ("matched" not in [tags]) {
drop {}
}
date {
# 对日志里面的时间戳进行格式转换,适配ES的时间格式
locale => "en_US"
timezone => "Asia/Shanghai"
match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
target => "@timestamp"
}
ruby {
code => "event['@timestamp'] = event['@timestamp'].getlocal"
}
mutate {
# 移除不需要的字段
remove_field => "[beat][hostname]"
remove_field => "[beat][name]"
remove_field => "@version"
remove_field => "[beat][version]"
remove_field => "input_type"
remove_field => "offset"
remove_field => "tags"
remove_field => "type"
remove_field => "host"
}
}
output {
# 上报系统日志
if [@metadata][type] == "messages" {
# stdout { codec => rubydebug } # 信息打屏,用于DEBUG
elasticsearch {
hosts => ["x.x.x.x:9200"]
index => "messages-%{+YYYY.MM.dd}"
# 禁止logstash管理模板,并指定上文上传的模板
manage_template => false
template_name => "template-messages"
}
}
}
因为数据格式的不同,grok正则表达式也要做相应的适配,这个过程应该是配置logstash最麻烦的地方。当然,这里我们可以用到 grok 的在线测试工具:http://grokdebug.herokuapp.com/
同样的,我们在配置完logstash之后,先屏蔽 elasticsearch 的上报,然后打开stdout:
stdout { codec => rubydebug }
进行调试,直到打屏数据满足我们的需求后再上报至ES。
有了数据之后,首次打开Kibana发现啥也看不到,此时我们需要先配置下Kibana的index,图解如下:
①、如图打开索引管理:
②、如图点击创建索引:
③、如图输入logstash指定的索引前缀,自动带出字段后选择时间戳字段,点击【Create】即可:
最后,回到Discover界面就能看到期待已久的高清美图了:
本文就介绍这么多,更多Kibana的奇淫巧计请关注《ES私房菜系列文章之教你玩转Kibana(整理中)》。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。