前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >logstash kafka filebeat zabbix

logstash kafka filebeat zabbix

作者头像
Kevin song
发布2021-03-08 15:37:27
1K0
发布2021-03-08 15:37:27
举报

<<<事情比较多没有文章输出,一个小时总结下近期使用的知识点>>>

1,kibana 监控 Logstash状态

logstash.yml 文件

代码语言:javascript
复制
egrep -v "^#|^$" /etc/logstash/logstash.yml 
path.data: /var/lib/logstash
pipeline.workers: 2
pipeline.batch.size: 256
pipeline.batch.delay: 500
http.host: "192.168.99.185"
log.level: info
path.logs: /var/log/logstash
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: elastic        
xpack.monitoring.elasticsearch.password: ******
xpack.monitoring.elasticsearch.hosts: ["http://192.168.99.185:9200", "http://192.168.99.186:9200"]

重启logstash

代码语言:javascript
复制
systemctl  restart  logstash

kibana 查看logstash 状态

2,logstash 多实例运行

logstash 启动多个conf 文件进行日志处理时,默认不是每个配置文件独立运行,而是作为一个整体,每个input会匹配所有的filter,然后匹配所有的output,这时就会导致数据被错误的处理以及发送到错误的地方;利用tags字段进行字段匹配避免数据被错误的处理。

filebeat 添加tags 字段

代码语言:javascript
复制
- type: log
  enabled: true
  tags: ["secure"]
  paths:
    - /var/log/secure
  fields:
   log_topic: linuxos
  include_lines: ["Accepted password","Failed password"]

egrep -v "*#|^$" system.conf

input 输入

代码语言:javascript
复制
input {
   kafka {
    codec => "json"
    topics => ["linuxos"]
    bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"]
    group_id => "os"
    auto_offset_reset => "latest"
  }  
}

filter匹配tags 字段

代码语言:javascript
复制
filter {
  if "secure" in [tags] {
    grok {
        match => {
           "message" => "%{SYSLOGTIMESTAMP:time} %{DATA:host1} .*?\: %{DATA:status} .*? for %{USER:user} from %{IP:clients} port %{NUMBER:port} .*?"
        }
    }
 }
  else if "messages" in [tags] {
     grok {
        match => {
           "message" => "%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:host1} %{DATA:syslog_prom} .*?"
        }
    }
 }
mutate {
      remove_field => ["time","offset","host","@version","[log]","[prospector]","[beat]","[input][type]"]
    }
 }

output 匹配tags字段输出1

代码语言:javascript
复制
output{
#stdout{codec => rubydebug}
  if "secure" in [tags]{
elasticsearch{
    index => "secure-%{+YYYY.MM.dd}"
    hosts => ["192.168.99.185:9200","192.168.99.186:9200"]
    user => "elastic"
    password => "******"
    }
  }
 if "messages" in [tags]{
elasticsearch{
    index => "messages-%{+YYYY.MM.dd}"
    hosts => ["192.168.99.185:9200","192.168.99.186:9200"] 
    user => "elastic"
    password => "******"
    }
  }
}

output 匹配tags字段输出2

代码语言:javascript
复制
output{
#stdout{codec => rubydebug}
if "ism-test-winlogbeat" in [tags]{
elasticsearch{
    index => "ism-test-winlogbeat-%{+YYYY.MM.dd}"
    hosts => ["192.168.99.185:9200","192.168.99.186:9200"]                                                                                                                                                             .209.208.52:9200"]
    user => "elastic"
    password => "******"
    }
  }
else if "ism-prod-winlogbeat" in [tags]{
elasticsearch{
    index => "ism-prod-winlogbeat-%{+YYYY.MM.dd}"
    hosts => ["192.168.99.185:9200","192.168.99.186:9200"]                                                                                                                                                             .209.208.52:9200"]
    user => "elastic"
    password => "******"
    }
  }
else{
elasticsearch{
    index => "ism-unknow-winlogbeat-%{+YYYY.MM.dd}"
    hosts => ["192.168.99.185:9200","192.168.99.186:9200"]                                                                                                                                                             .209.208.52:9200"]
    user => "elastic"
    password => "******"
    }
  }
}

3,Kafka

查看topic为network的描述信息

代码语言:javascript
复制
./bin/kafka-topics.sh --describe --zookeeper 192.168.99.232:2181,192.168.99.233:2181,192.168.99.221:2181 --topic network

注意 一个partitions只会被一个consumer消费(为了保证消息的顺序)

代码语言:javascript
复制
num.partitions=3

查看kafka 数据挤压情况

代码语言:javascript
复制
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.99.232:9092,192.168.99.233:9092,192.168.99.221:9092 --group logstash  --describe

查看kafka 消费组

代码语言:javascript
复制
./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.99.232:9092,192.168.99.233:9092,192.168.99.221:9092 --list

官方参数链接

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

logstash-input-kafka插件配置文件

代码语言:javascript
复制
input {
  kafka {
    codec => "json"
    topics => ["network"]
    bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"]
    group_id => "logstash1" 
    auto_offset_reset => "latest"
    consumer_threads => 2
}  
}

Consumer Group:是个逻辑上的概念,为一组consumer的集合,同一个topic的数据会广播给不同的group,同一个group中只有一个consumer能拿到这个数据。

也就是说对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个consumer消费,

logstash消费kafka集群的配置,其中加入了group_id参数,group_id是一个的字符串,唯一标识一个group,具有相同group_id的consumer构成了一个consumer group,

这样启动多个logstash进程,只需要保证group_id一致就能达到logstash高可用的目的,一个logstash挂掉同一Group内的logstash可以继续消费。

除了高可用外同一Group内的多个Logstash可以同时消费kafka内topic的数据,从而提高logstash的处理能力,但需要注意的是消费kafka数据时,每个consumer最多只能使用一个partition,当一个Group内consumer的数量大于partition的数量时,只有等于partition个数的consumer能同时消费,其他的consumer处于等待状态。

例如一个topic下有3个partition,那么在一个有5个consumer的group中只有3个consumer在同时消费topic的数据,而另外两个consumer处于等待状态,所以想要增加logstash的消费性能,可以适当的增加topic的partition数量,但kafka中partition数量过多也会导致kafka集群故障恢复时间过长,消耗更多的文件句柄与客户端内存等问题,也并不是partition配置越多越好,需要在使用中找到一个平衡。

consumer_threads(并行传输)

Logstash的input读取数的时候可以多线程并行读取,logstash-input-kafka插件中对应的配置项是consumer_threads,默认值为1。一般这个默认值不是最佳选择,那这个值该配置多少呢?这个需要对kafka的模型有一定了解:

kafka的topic是分区的,数据存储在每个分区内;

kafka的consumer是分组的,任何一个consumer属于某一个组,一个组可以包含多个consumer,同一个组内的consumer不会重复消费的同一份数据。

所以,对于kafka的consumer,一般最佳配置是同一个组内consumer个数(或线程数)等于topic的分区数,这样consumer就会均分topic的分区,达到比较好的均衡效果。

举个例子,比如一个topic有n个分区,consumer有m个线程。那最佳场景就是n=m,此时一个线程消费一个分区。如果n小于m,即线程数多于分区数,那多出来的线程就会空闲。

如果n大于m,那就会存在一些线程同时消费多个分区的数据,造成线程间负载不均衡。

所以,一般consumer_threads配置为你消费的topic的所包含的partition个数即可。如果有多个Logstash实例,那就让实例个数 * consumer_threads等于分区数即可。

例如:启动了2个logstash,分区数partition为8,那么consumer_threads为4;

auto_offset_reset

Kafka 中没有初始偏移量或偏移量超出范围时该怎么办:

earliest:将偏移量自动重置为最早的偏移量

latest:自动将偏移量重置为最新偏移量

none:如果未找到消费者组的先前偏移量,则向消费者抛出异常

4,Filebeat

数据流

filebeat--kafka--logstash--elasticsearch

文件读取

tail_files: 默认为false。配置为 true 时,filebeat 将从新文件的最后位置开始读取,而不是从开头读取新文件, 注意:如果配合日志轮循使用,新文件的第一行将被跳过。

此选项适用于 Filebeat 尚未处理的文件。如果先前运行了Filebeat并且文件的状态已经保留,tail_files 则不会应用。

第一次运行 Filebeat 时,可以使用 tail_files: true 来避免索引旧的日志行。第一次运行后,建议禁用此选项。

registry file

filebeat 会将自己处理日志文件的进度信息写入到registry文件中,以保证filebeat在重启之后能够接着处理未处理过的数据,而无需从头开始。

如果要让 filebeat 从头开始读文件,需要停止filebeat,然后删除registry file:

代码语言:javascript
复制
systemctl stop filebeat ;
rm -rf /var/lib/filebeat/registry/* ; 
systemctl start filebaet

registry 文件里字段的解释:

  • source:日志文件的路径
  • offset:已经采集的日志的字节数;已经采集到日志的哪个字节位置
  • inode:日志文件的inode号
  • device:日志所在的磁盘编号
  • timestamp:日志最后一次发生变化的时间戳
  • ttl:采集失效时间,-1表示只要日志存在,就一直采集该日志

多行合并 multiline

官方链接:

https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html#multiline

代码语言:javascript
复制
multiline.pattern:正则表达式
multiline.negate:true 或 false;#默认是 false,匹配 pattern 的行合并到上一行;true,不匹配pattern的行合并到上一行
multiline.match:after 或 before;
#合并到上一行的末尾或开头
multiline.max_lines
#可以合并成一个事件的最大行数。#如果一个多行消息包含的行数超过max_lines,则超过的行被丢弃。默认是500。

时间戳

代码语言:javascript
复制
multiline.pattern: '^\[0-9]{4}-[0-9]{2}-[0-9]{2}'
 multiline.negate: true
 multiline.match: after

[0-9]{4}-[0-9]{2}-[0-9]{2}

4个数字-两个数字-两个数字

output

代码语言:javascript
复制
#------------------------------kafka output---------------------------------------
output.kafka:
  enabled: true
  hosts: ["192.168.99.233:9092","192.168.99.232:9092","192.168.99.221:9092"]
  topic: '%{[fields][log_topic]}'
  partition.round_robin:
    reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000

fields_under_root

如果将此选项设置为 true(默认为 false),则自定义字段将作为顶级字段存储在输出文档中,而不是分组在 fields 子词典下。如果自定义字段名称与其他字段名称冲突,则自定义字段将覆盖其他字段。

代码语言:javascript
复制
fields_under_root: true
fields:
  hostip: 192.168.99.50

processor

代码语言:javascript
复制
processors:
  - drop_fields:    #删除字段,默认情况自动添加beat字段
      fields: ["log", "input", "host", "agent", "ecs"]

5,ubuntu 安装filebeat 监控用户登录和系统错误日志

下载filebeat 软件包

代码语言:javascript
复制
sudo curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.8.4-amd64.deb

安装filebeat 软件包

代码语言:javascript
复制
sudo dpkg -i filebeat-6.8.4-amd64.deb

filebeat配置文件

代码语言:javascript
复制
ubuntu@ecv-node1:/mnt$ sudo egrep -v "*#|^$" /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  tags: ["secure"]
  paths:
    - /var/log/auth.log
  fields:
   log_topic: linuxos
  include_lines: ["Accepted password","Failed password"]
  exclude_lines: ['message']
- type: log
  enabled: true
  paths:
    - /var/log/syslog
  fields:
   log_topic: linuxos
  tags: ["messages"]
  include_lines: ['Failed','error','ERROR'] 
filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false
setup.template.settings:
  index.number_of_shards: 3
fields_under_root: true
fields:
  ipaddress: 192.168.99.54
setup.kibana:
output.kafka:
  enabled: true 
  hosts: ["192.168.99.233:9092","192.168.99.232:9092","192.168.99.221:9092"]
  topic: '%{[fields][log_topic]}' 
  partition.round_robin:
  reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000
processors:
  - drop_fields:
     fields: ["beat","log","name","os"]
  - add_host_metadata: ~
  - add_cloud_metadata: ~

/var/log/auth.log --用户登陆日志

/var/log/syslog -- 系统日志

6,zabbix 监控systemctl 服务状态

判断服务运行脚本

代码语言:javascript
复制
[root@elk-node2 zabbix]# pwd
/etc/zabbix
[root@elk-node2 zabbix]# cat   service.sh 
#!/bin/bash
stat=`systemctl status $1 |grep Active|awk '{print $3}'|cut -c 2-8`
if [ $stat == running ];then
   echo 1
else
   echo 0
fi

zabbix-agent 配置文件

代码语言:javascript
复制
UnsafeUserParameters=1
UserParameter=service[*],sh /etc/zabbix/service.sh $1

zabbix_get 测试

代码语言:javascript
复制
[root@ZABBIX-Server /]# zabbix_get    -s   192.168.99.186 -k service[firewalld]
1

监控项

触发器

最新数据

kibana 仪表盘

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-02-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开源搬运工宋师傅 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档