<<<事情比较多没有文章输出,一个小时总结下近期使用的知识点>>>
1,kibana 监控 Logstash状态
logstash.yml 文件
egrep -v "^#|^$" /etc/logstash/logstash.yml
path.data: /var/lib/logstash
pipeline.workers: 2
pipeline.batch.size: 256
pipeline.batch.delay: 500
http.host: "192.168.99.185"
log.level: info
path.logs: /var/log/logstash
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: elastic
xpack.monitoring.elasticsearch.password: ******
xpack.monitoring.elasticsearch.hosts: ["http://192.168.99.185:9200", "http://192.168.99.186:9200"]
重启logstash
systemctl restart logstash
kibana 查看logstash 状态
2,logstash 多实例运行
logstash 启动多个conf 文件进行日志处理时,默认不是每个配置文件独立运行,而是作为一个整体,每个input会匹配所有的filter,然后匹配所有的output,这时就会导致数据被错误的处理以及发送到错误的地方;利用tags字段进行字段匹配避免数据被错误的处理。
filebeat 添加tags 字段
- type: log
enabled: true
tags: ["secure"]
paths:
- /var/log/secure
fields:
log_topic: linuxos
include_lines: ["Accepted password","Failed password"]
egrep -v "*#|^$" system.conf
input 输入
input {
kafka {
codec => "json"
topics => ["linuxos"]
bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"]
group_id => "os"
auto_offset_reset => "latest"
}
}
filter匹配tags 字段
filter {
if "secure" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:host1} .*?\: %{DATA:status} .*? for %{USER:user} from %{IP:clients} port %{NUMBER:port} .*?"
}
}
}
else if "messages" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:host1} %{DATA:syslog_prom} .*?"
}
}
}
mutate {
remove_field => ["time","offset","host","@version","[log]","[prospector]","[beat]","[input][type]"]
}
}
output 匹配tags字段输出1
output{
#stdout{codec => rubydebug}
if "secure" in [tags]{
elasticsearch{
index => "secure-%{+YYYY.MM.dd}"
hosts => ["192.168.99.185:9200","192.168.99.186:9200"]
user => "elastic"
password => "******"
}
}
if "messages" in [tags]{
elasticsearch{
index => "messages-%{+YYYY.MM.dd}"
hosts => ["192.168.99.185:9200","192.168.99.186:9200"]
user => "elastic"
password => "******"
}
}
}
output 匹配tags字段输出2
output{
#stdout{codec => rubydebug}
if "ism-test-winlogbeat" in [tags]{
elasticsearch{
index => "ism-test-winlogbeat-%{+YYYY.MM.dd}"
hosts => ["192.168.99.185:9200","192.168.99.186:9200"] .209.208.52:9200"]
user => "elastic"
password => "******"
}
}
else if "ism-prod-winlogbeat" in [tags]{
elasticsearch{
index => "ism-prod-winlogbeat-%{+YYYY.MM.dd}"
hosts => ["192.168.99.185:9200","192.168.99.186:9200"] .209.208.52:9200"]
user => "elastic"
password => "******"
}
}
else{
elasticsearch{
index => "ism-unknow-winlogbeat-%{+YYYY.MM.dd}"
hosts => ["192.168.99.185:9200","192.168.99.186:9200"] .209.208.52:9200"]
user => "elastic"
password => "******"
}
}
}
3,Kafka
查看topic为network的描述信息
./bin/kafka-topics.sh --describe --zookeeper 192.168.99.232:2181,192.168.99.233:2181,192.168.99.221:2181 --topic network
注意 一个partitions只会被一个consumer消费(为了保证消息的顺序)
num.partitions=3
查看kafka 数据挤压情况
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.99.232:9092,192.168.99.233:9092,192.168.99.221:9092 --group logstash --describe
查看kafka 消费组
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.99.232:9092,192.168.99.233:9092,192.168.99.221:9092 --list
官方参数链接
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
logstash-input-kafka插件配置文件
input {
kafka {
codec => "json"
topics => ["network"]
bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"]
group_id => "logstash1"
auto_offset_reset => "latest"
consumer_threads => 2
}
}
Consumer Group:是个逻辑上的概念,为一组consumer的集合,同一个topic的数据会广播给不同的group,同一个group中只有一个consumer能拿到这个数据。
也就是说对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个consumer消费,
logstash消费kafka集群的配置,其中加入了group_id参数,group_id是一个的字符串,唯一标识一个group,具有相同group_id的consumer构成了一个consumer group,
这样启动多个logstash进程,只需要保证group_id一致就能达到logstash高可用的目的,一个logstash挂掉同一Group内的logstash可以继续消费。
除了高可用外同一Group内的多个Logstash可以同时消费kafka内topic的数据,从而提高logstash的处理能力,但需要注意的是消费kafka数据时,每个consumer最多只能使用一个partition,当一个Group内consumer的数量大于partition的数量时,只有等于partition个数的consumer能同时消费,其他的consumer处于等待状态。
例如一个topic下有3个partition,那么在一个有5个consumer的group中只有3个consumer在同时消费topic的数据,而另外两个consumer处于等待状态,所以想要增加logstash的消费性能,可以适当的增加topic的partition数量,但kafka中partition数量过多也会导致kafka集群故障恢复时间过长,消耗更多的文件句柄与客户端内存等问题,也并不是partition配置越多越好,需要在使用中找到一个平衡。
consumer_threads(并行传输)
Logstash的input读取数的时候可以多线程并行读取,logstash-input-kafka插件中对应的配置项是consumer_threads,默认值为1。一般这个默认值不是最佳选择,那这个值该配置多少呢?这个需要对kafka的模型有一定了解:
kafka的topic是分区的,数据存储在每个分区内;
kafka的consumer是分组的,任何一个consumer属于某一个组,一个组可以包含多个consumer,同一个组内的consumer不会重复消费的同一份数据。
所以,对于kafka的consumer,一般最佳配置是同一个组内consumer个数(或线程数)等于topic的分区数,这样consumer就会均分topic的分区,达到比较好的均衡效果。
举个例子,比如一个topic有n个分区,consumer有m个线程。那最佳场景就是n=m,此时一个线程消费一个分区。如果n小于m,即线程数多于分区数,那多出来的线程就会空闲。
如果n大于m,那就会存在一些线程同时消费多个分区的数据,造成线程间负载不均衡。
所以,一般consumer_threads配置为你消费的topic的所包含的partition个数即可。如果有多个Logstash实例,那就让实例个数 * consumer_threads等于分区数即可。
例如:启动了2个logstash,分区数partition为8,那么consumer_threads为4;
auto_offset_reset
Kafka 中没有初始偏移量或偏移量超出范围时该怎么办:
earliest:将偏移量自动重置为最早的偏移量
latest:自动将偏移量重置为最新偏移量
none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
4,Filebeat
数据流
filebeat--kafka--logstash--elasticsearch
文件读取
tail_files: 默认为false。配置为 true 时,filebeat 将从新文件的最后位置开始读取,而不是从开头读取新文件, 注意:如果配合日志轮循使用,新文件的第一行将被跳过。
此选项适用于 Filebeat 尚未处理的文件。如果先前运行了Filebeat并且文件的状态已经保留,tail_files 则不会应用。
第一次运行 Filebeat 时,可以使用 tail_files: true 来避免索引旧的日志行。第一次运行后,建议禁用此选项。
registry file
filebeat 会将自己处理日志文件的进度信息写入到registry文件中,以保证filebeat在重启之后能够接着处理未处理过的数据,而无需从头开始。
如果要让 filebeat 从头开始读文件,需要停止filebeat,然后删除registry file:
systemctl stop filebeat ;
rm -rf /var/lib/filebeat/registry/* ;
systemctl start filebaet
registry 文件里字段的解释:
多行合并 multiline
官方链接:
https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html#multiline
multiline.pattern:正则表达式
multiline.negate:true 或 false;#默认是 false,匹配 pattern 的行合并到上一行;true,不匹配pattern的行合并到上一行
multiline.match:after 或 before;
#合并到上一行的末尾或开头
multiline.max_lines
#可以合并成一个事件的最大行数。#如果一个多行消息包含的行数超过max_lines,则超过的行被丢弃。默认是500。
时间戳
multiline.pattern: '^\[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
[0-9]{4}-[0-9]{2}-[0-9]{2}
4个数字-两个数字-两个数字
output
#------------------------------kafka output---------------------------------------
output.kafka:
enabled: true
hosts: ["192.168.99.233:9092","192.168.99.232:9092","192.168.99.221:9092"]
topic: '%{[fields][log_topic]}'
partition.round_robin:
reachable_only: true
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 10000000
fields_under_root
如果将此选项设置为 true(默认为 false),则自定义字段将作为顶级字段存储在输出文档中,而不是分组在 fields 子词典下。如果自定义字段名称与其他字段名称冲突,则自定义字段将覆盖其他字段。
fields_under_root: true
fields:
hostip: 192.168.99.50
processor
processors:
- drop_fields: #删除字段,默认情况自动添加beat字段
fields: ["log", "input", "host", "agent", "ecs"]
5,ubuntu 安装filebeat 监控用户登录和系统错误日志
下载filebeat 软件包
sudo curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.8.4-amd64.deb
安装filebeat 软件包
sudo dpkg -i filebeat-6.8.4-amd64.deb
filebeat配置文件
ubuntu@ecv-node1:/mnt$ sudo egrep -v "*#|^$" /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
enabled: true
tags: ["secure"]
paths:
- /var/log/auth.log
fields:
log_topic: linuxos
include_lines: ["Accepted password","Failed password"]
exclude_lines: ['message']
- type: log
enabled: true
paths:
- /var/log/syslog
fields:
log_topic: linuxos
tags: ["messages"]
include_lines: ['Failed','error','ERROR']
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
fields_under_root: true
fields:
ipaddress: 192.168.99.54
setup.kibana:
output.kafka:
enabled: true
hosts: ["192.168.99.233:9092","192.168.99.232:9092","192.168.99.221:9092"]
topic: '%{[fields][log_topic]}'
partition.round_robin:
reachable_only: true
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 10000000
processors:
- drop_fields:
fields: ["beat","log","name","os"]
- add_host_metadata: ~
- add_cloud_metadata: ~
/var/log/auth.log --用户登陆日志
/var/log/syslog -- 系统日志
6,zabbix 监控systemctl 服务状态
判断服务运行脚本
[root@elk-node2 zabbix]# pwd
/etc/zabbix
[root@elk-node2 zabbix]# cat service.sh
#!/bin/bash
stat=`systemctl status $1 |grep Active|awk '{print $3}'|cut -c 2-8`
if [ $stat == running ];then
echo 1
else
echo 0
fi
zabbix-agent 配置文件
UnsafeUserParameters=1
UserParameter=service[*],sh /etc/zabbix/service.sh $1
zabbix_get 测试
[root@ZABBIX-Server /]# zabbix_get -s 192.168.99.186 -k service[firewalld]
1
监控项
触发器
最新数据
kibana 仪表盘