前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ELK 日志分析系统整合 KafKa Zookeeper 集群

ELK 日志分析系统整合 KafKa Zookeeper 集群

作者头像
Kevin song
发布2020-04-27 10:18:28
1.1K0
发布2020-04-27 10:18:28
举报
文章被收录于专栏:运维监控日志分析

集群主机环境

Hostname

Server IP

Software Version

Role

elk-node1

192.168.99.185

elasticsearch-6.8.4-1.noarchlogstash-6.8.4-1.noarchkibana-6.8.4-1.x86_64openjdk version "1.8.0_242"

es master data nodekibana weblogstash

elk-node2

192.168.99.186

elasticsearch-6.8.4-1.noarchlogstash-6.8.4-1.noarchopenjdk version "1.8.0_242"

es data nodelogstash

kafka-node1

192.168.99.233

kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242"

kafka/zookeeper

kafka-node2

192.168.99.232

kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242"

kafka/zookeeper

kafka-node3

192.168.99.221

kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242"

kafka/zookeeper

zabbix-server

192.168.99.50

filebeat-6.8.4-1.x86_64

filebeat

日志采集分析系统架构

ELK集群配置

ELK集群部署配置请参考公众号ELK专栏《ELK集群部署》的文章。

kafka集群配置

kafka/zookeeper 集群配置请参考公众号ELK专栏《KafKa 工作原理 && 集群部署(一)》的文章。

注意:zookeeper版本从3.5.5开始带有"bin.tar.gz"名称的软件包是直接可以使用的编译好的二进制包,之前的"tar.gz"的软件包是只有源码的包,无法直接使用。使用"apache-zookeeper-3.5.7.tar.gz"软件包会出现使用zkServer.sh start启动 zookeeper服务失败,客户端连接也会出错!!!正确的安装包为"apache-zookeeper-3.5.7.tar.gz"软件包。

网络设备日志服务器配置

Rsyslog 网络日志服务器配置请参考公众号ELK专栏《ELK 部署可视化网络日志分析监控平台》的文章。

Filebeat config

filebeat作为kafka生产消息者,在filebeat 主机中日志分为网络设备日志和系统日志,对不同的网络设备日志和linux 系统的不同种类的日志使用tags标签的方式进行区分,以便于在logstash中使用tags进行匹配进行不同方式的字段清洗。同时分别使用不同的log_topic输出到kafka集群中,其中网络设备日志的log_topic=network,linux系统的log_topic=linuxos。

代码语言:javascript
复制
egrep -v  "*#|^$" /etc/filebeat/filebeat.yml filebeat.inputs:- type: log  enabled: true  paths:    - /mnt/huawei/*  fields:   log_topic: network  tags: ["huawei"]  include_lines: ['Failed','failed','error','ERROR','\bDOWN\b','\bdown\b','\bUP\b','\bup\b']- type: log  paths:    - /mnt/h3c/*  fields:   log_topic: network  tags: ["h3c"]  include_lines: ['Failed','failed','error','ERROR','\bDOWN\b','\bdown\b','\bUP\b','\bup\b']- type: log  paths:    - /mnt/ruijie/*  fields:   log_topic: network  tags: ["ruijie"]  include_lines: ['Failed','failed','error','ERROR','\bDOWN\b','\bdown\b','\bUP\b','\bup\b']- type: log  enabled: true  tags: ["secure"]  paths:    - /var/log/secure  fields:   log_topic: linuxos  include_lines: [".*Failed.*",".*Accepted.*"]- type: log  enabled: true  paths:    - /var/log/messages  fields:   log_topic: linuxos  tags: ["messages"]  include_lines: ['Failed','error','ERROR'] filebeat.config.modules:  path: ${path.config}/modules.d/*.yml  reload.enabled: false  name: 192.168.99.185setup.template.settings:  index.number_of_shards: 3setup.kibana:output.kafka:  enabled: true   hosts: ["192.168.99.233:9092","192.168.99.223:9092","192.168.99.221:9092"]  topic: '%{[fields][log_topic]}'   partition.round_robin:    reachable_only: true  worker: 2  required_acks: 1  compression: gzip  max_message_bytes: 10000000processors:  - drop_fields:     fields: ["beat", "input","host","log","source","name","os"]  - add_host_metadata: ~  - add_cloud_metadata: ~

注意:filebeat output kafka集群可能会出现连接kafka失败的错误,请观察filebeat日志日志路径为/var/log//filebeat/filebeat。使用"tail -f /var/log//filebeat/filebeat"命令查看。

logstash config

两台logstash分别作为kafka集群的消费消息者,192.168.99.185主机负责网络设备日志的清洗,192.168.99.186主机负责linux系统日志的清洗,当然网络设备日志的清洗和linux系统日志的清洗可以运行在一台logstash上。以下两台logstash 的配置包含了output至Zabbix 告警的部分,如不需要对接Zabbix告警平台可删掉对接Zabbix部分的配置。

logstash日志路径"/var/log/logstash/logstash-plain.log",使用"tail -f /var/log/logstash/logstash-plain.log"命令查看。

network logstash

代码语言:javascript
复制
[root@elk-node1 conf.d]# cat network.conf input {  kafka {    codec => "json"    topics => ["network"]    bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"]    group_id => "logstash"  }   }
filter {  if "huawei" in [tags] {    grok{      match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"}        }  }
   else if "h3c" in [tags] {    grok{      match => {"message" => "%{SYSLOGTIMESTAMP:time} %{YEAR:year} %{DATA:hostname} %{GREEDYDATA:info}"}        }  }  else if "ruijie" in [tags] {    grok{      match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"}        }  }mutate {      add_field => [ "[zabbix_key]", "networklogs" ]      add_field => [ "[zabbix_host]", "192.168.99.185" ]      add_field => [ "count","%{hostname}:%{info}" ]      remove_field => ["message","time","year","offset","tags","path","host","@version","[log]","[prospector]","[beat]","[input][type]","[source]"]    }} 
output{stdout{codec => rubydebug}elasticsearch{    index => "networklogs-%{+YYYY.MM.dd}"    hosts => ["192.168.99.185:9200"]    user => "elastic"    password => "qZXo7E"        sniffing => false    }if [count]  =~ /(ERR|error|ERROR|Failed|failed)/ {        zabbix {                zabbix_host => "[zabbix_host]"                zabbix_key => "[zabbix_key]"                zabbix_server_host => "192.168.99.200"                zabbix_server_port => "10051"                 zabbix_value => "count"    }  }}

linuxos logstash

代码语言:javascript
复制
[root@elk-node2 ~]# cat  /etc/logstash/conf.d/system.conf input {   kafka {    codec => "json"    topics => ["linuxos"]    bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"]  }   }
filter {  if "secure" in [tags] {    grok {        match => {           "message" => "%{SYSLOGTIMESTAMP:time} %{DATA:host1} .*?\: %{DATA:status} .*? for %{USER:user} from %{IP:clients} port %{NUMBER:port} .*?"        }    }mutate {      add_field => [ "[zabbix_key]", "securelogs" ]      add_field => [ "[zabbix_host]", "192.168.99.186" ]      add_field => [ "count1","%{host1}--%{message}" ]   } }  else if "messages" in [tags] {     grok {        match => {           "message" => "%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:host1} %{DATA:syslog_prom} .*?"        }    } }mutate {      remove_field => ["time","offset","path","host","@version","[log]","[prospector]","[beat]","[input][type]","[source]"]    } }
output{stdout{codec => rubydebug}  if "secure" in [tags]{elasticsearch{    index => "secure-%{+YYYY.MM.dd}"    hosts => ["192.168.99.186:9200"]    user => "elastic"    password => "qZXo7E"    }  } if "messages" in [tags]{elasticsearch{    index => "messages-%{+YYYY.MM.dd}"    hosts => ["192.168.99.186:9200"]     user => "elastic"    password => "qZXo7E"    }  }if [count1]  =~ /(Failed|Accepted)/ {zabbix {   zabbix_host => "[zabbix_host]"   zabbix_key => "[zabbix_key]"   zabbix_server_host => "192.168.99.200"   zabbix_server_port => "10051"    zabbix_value => "count1"    }  }}

KafKa 集群验证测试

查看filebeat 生成的topic

代码语言:javascript
复制
[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-topics.sh --list --zookeeper 192.168.99.232:2181,192.168.99.233:2181,192.168.99.221:2181__consumer_offsetslinuxosnetwork

验证消费者消费消息

代码语言:javascript
复制
#network 主题消费[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-console-consumer.sh --bootstrap-server  192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092 --topic network  --from-beginning#linuxos 主题消费[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-console-consumer.sh --bootstrap-server  192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092 --topic linuxos  --from-beginning

查看logstash清洗字段输出

代码语言:javascript
复制
tail -f /var/log/messages

Kibana Web UI

kibana 登陆认证部署的配置请查看公众号ELK专栏《Elastic Stack 6.8 X-Pack 安全功能部署》的文章。

user authentication

Discover networklogs index

Discover secure index

Discover messages index

网络设备日志仪表盘

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-04-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开源搬运工宋师傅 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Rsyslog 网络日志服务器配置请参考公众号ELK专栏《ELK 部署可视化网络日志分析监控平台》的文章。
相关产品与服务
日志服务
日志服务(Cloud Log Service,CLS)是腾讯云提供的一站式日志服务平台,提供了从日志采集、日志存储到日志检索,图表分析、监控告警、日志投递等多项服务,协助用户通过日志来解决业务运维、服务监控、日志审计等场景问题。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档