前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >CentOS 7.6 部署ELK日志分析系统步骤

CentOS 7.6 部署ELK日志分析系统步骤

作者头像
星哥玩云
发布2022-07-28 14:01:57
4270
发布2022-07-28 14:01:57
举报
文章被收录于专栏:开源部署开源部署

记录在CentOS 7.6下部署ELK日志分析系统的过程步骤,希望对大家有所帮助。

下载elasticsearch

创建elk用户并授权 useradd elk chown -R elk:elk /home/elk/elasticsearch chown -R elk:elk /home/elk/elasticsearch1 chown -R elk:elk /home/elk/elasticsearch2 mkdir -p /home/eladata mkdir -p /var/log/elk chown -R elk:elk /home/eladata chown -R elk:elk /var/log/elk

主节点master

elasticsearch解压,修改配置文件 /home/elk/elasticsearch/config [root@localhost config]# grep -v  "^#" elasticsearch.yml cluster.name: my-application node.name: node0 node.master: true node.attr.rack: r1 node.max_local_storage_nodes: 3 path.data: /home/eladata path.logs: /var/log/elk http.cors.enabled: true http.cors.allow-origin: "*" network.host: 192.168.1.70 http.port: 9200 transport.tcp.port: 9301 discovery.zen.minimum_master_nodes: 1 cluster.initial_master_nodes: ["node0"]

手动启动命令 su elk -l -c '/home/elk/elasticsearch/bin/elasticsearch -d'

启动文件 elasticsearch.service [root@localhost system]# pwd /lib/systemd/system [root@localhost system]# cat elasticsearch.service [Unit] Description=Elasticsearch Documentation=http://www.elastic.co Wants=network-online.target After=network-online.target [Service] RuntimeDirectory=elasticsearch PrivateTmp=true Environment=ES_HOME=/home/elk/elasticsearch Environment=ES_PATH_CONF=/home/elk/elasticsearch/config Environment=PID_DIR=/var/run/elasticsearch EnvironmentFile=-/etc/sysconfig/elasticsearch WorkingDirectory=/home/elk/elasticsearch User=elk Group=elk ExecStart=/home/elk/elasticsearch/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quiet StandardOutput=journal StandardError=inherit LimitNOFILE=65536 LimitNPROC=4096 LimitAS=infinity LimitFSIZE=infinity TimeoutStopSec=0 KillSignal=SIGTERM KillMode=process SendSIGKILL=no SuccessExitStatus=143 [Install] WantedBy=multi-user.target

[root@localhost system]#

Node1节点 /home/elk/elasticsearch1/config [root@localhost config]# grep -v  "^#" elasticsearch.yml cluster.name: my-application node.name: node1 node.master: false node.attr.rack: r1 node.max_local_storage_nodes: 3 path.data: /home/eladata path.logs: /var/log/elk http.cors.enabled: true http.cors.allow-origin: "*" network.host: 192.168.1.70 transport.tcp.port: 9303 http.port: 9302 discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"] [root@localhost config]#

手动启动命令 su elk -l -c '/home/elk/elasticsearch1/bin/elasticsearch1 -d'

启动文件 elasticsearch1.service [root@localhost system]# pwd /lib/systemd/system [root@localhost system]# cat elasticsearch1.service [Unit] Description=Elasticsearch Documentation=http://www.elastic.co Wants=network-online.target After=network-online.target [Service] RuntimeDirectory=elasticsearch1 PrivateTmp=true Environment=ES_HOME=/home/elk/elasticsearch1 Environment=ES_PATH_CONF=/home/elk/elasticsearch1/config Environment=PID_DIR=/var/run/elasticsearch EnvironmentFile=-/etc/sysconfig/elasticsearch WorkingDirectory=/home/elk/elasticsearch User=elk Group=elk ExecStart=/home/elk/elasticsearch1/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quiet StandardOutput=journal StandardError=inherit LimitNOFILE=65536 LimitNPROC=4096 LimitAS=infinity LimitFSIZE=infinity TimeoutStopSec=0 KillSignal=SIGTERM KillMode=process SendSIGKILL=no SuccessExitStatus=143 [Install] WantedBy=multi-user.target

[root@localhost system]#

Node2节点 /home/elk/elasticsearch2/config [root@localhost config]# grep -v  "^#" elasticsearch.yml cluster.name: my-application node.name: node2 node.attr.rack: r1 node.master: false node.max_local_storage_nodes: 3 path.data: /home/eladata path.logs: /var/log/elk http.cors.enabled: true http.cors.allow-origin: "*" network.host: 192.168.1.70 http.port: 9203 transport.tcp.port: 9304 discovery.zen.ping.unicast.hosts: ["192.168.1.70:9301"] discovery.zen.minimum_master_nodes: 1 [root@localhost config]#

手动启动命令 su elk -l -c '/home/elk/elasticsearch2/bin/elasticsearch2 -d'

启动文件 elasticsearch2.service [root@localhost system]# pwd /lib/systemd/system [root@localhost system]# cat elasticsearch2.service [Unit] Description=Elasticsearch Documentation=http://www.elastic.co Wants=network-online.target After=network-online.target [Service] RuntimeDirectory=elasticsearch2 PrivateTmp=true Environment=ES_HOME=/home/elk/elasticsearch2 Environment=ES_PATH_CONF=/home/elk/elasticsearch2/config Environment=PID_DIR=/var/run/elasticsearch EnvironmentFile=-/etc/sysconfig/elasticsearch WorkingDirectory=/home/elk/elasticsearch2 User=elk Group=elk ExecStart=/home/elk/elasticsearch2/bin/elasticsearch -p ${PID_DIR}/elasticsearch.pid --quiet StandardOutput=journal StandardError=inherit LimitNOFILE=65536 LimitNPROC=4096 LimitAS=infinity LimitFSIZE=infinity TimeoutStopSec=0 KillSignal=SIGTERM KillMode=process SendSIGKILL=no SuccessExitStatus=143 [Install] WantedBy=multi-user.target

[root@localhost system]#

下载logstash

目录如下,默认配置即可 [root@localhost logstash]# pwd /home/elk/logstash [root@localhost logstash]#

手动启动命令 ./logstash -f ../dev.conf nohup ./logstash -f ../dev.conf &

下载kibana

配置文件如下 [root@localhost config]# pwd /home/elk/kibana/config [root@localhost config]# grep -v  "^#" kibana.yml server.host: "192.168.1.70" elasticsearch.hosts: ["http://192.168.1.70:9200"] kibana.index: ".kibana" i18n.locale: "zh-CN"

手动启动命令 ./kibana nohup ./kibana &

kibana启动文件 [root@localhost system]# pwd /lib/systemd/system [root@localhost system]# cat kibana.service [Unit] Description=Kibana  Server Manager [Service] ExecStart=/home/elk/kibana/bin/kibana [Install] WantedBy=multi-user.target [root@localhost system]#

端口为:5601 访问:192.168.1.70:5601

安装Elasticsearch -head yum install git npm git clone https://github.com/mobz/elasticsearch-head.git [root@localhost elasticsearch-head]# pwd /home/elk/elasticsearch-head [root@localhost elasticsearch-head]#

启动 npm install npm run start nohup npm run start &

curl -XPUT '192.168.2.67:9100/book'

访问192.168.2.67:9100 即可访问

下载kafka

修改配置文件如下 [root@localhost config]# pwd /home/elk/kafka/config [root@localhost config]# grep -v "^#" server.properties broker.id=0 listeners=PLAINTEXT://192.168.1.70:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/var/log/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true [root@localhost config]#

kafka配置启动zookeeper

手动启动方式 [root@localhost bin]# pwd /home/elk/kafka/bin [root@localhost bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties

systemctl 启动zookeeper [root@localhost system]# pwd /lib/systemd/system [root@localhost system]# cat zookeeper.service [Service] Type=forking SyslogIdentifier=zookeeper Restart=always RestartSec=0s ExecStart=/home/elk/kafka/bin/zookeeper-server-start.sh -daemon /home/elk/kafka/config/zookeeper.properties ExecStop=/home/elk/kafka/bin/zookeeper-server-stop.sh [root@localhost system]#

启动kafka服务

手动启动方式 ./kafka-server-start.sh ../config/server.properties

systemctl 启动kafka [root@localhost system]# pwd /lib/systemd/system [root@localhost system]# cat kafka.service [Unit] Description=Apache kafka After=network.target [Service] Type=simple Restart=always RestartSec=0s ExecStart=/home/elk/kafka/bin/kafka-server-start.sh  /home/elk/kafka/config/server.properties ExecStop=/home/elk/kafka/bin/kafka-server-stop.sh [root@localhost system]#

测试kafka

新建一个名字为test的topic /kafka-topics.sh --create --zookeeper 192.168.1.70:2181 --replication-factor 1 --partitions 1 --topic test

查看kafka中的topic ./kafka-topics.sh --list  --zookeeper 192.168.1.70:2181

往kafka topic为test中 生产消息 ./kafka-console-producer.sh --broker-list 192.168.1.70:9092 --topic test

在kafka topic为test中 消费消息 bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.70:9092 --topic test --from-beginning

生产的消息,消费那边接受到即是ok的

目标机器安装filebeat

安装6.5版本的 [root@localhost filebeat]# pwd /usr/local/filebeat [root@localhost filebeat]# cat filebeat.yml filebeat.prospectors: - type: log   paths:     - /opt/logs/workphone-tcp/catalina.out   fields:     tag: 54_tcp_catalina_out - type: log   paths:     - /opt/logs/workphone-webservice/catalina.out   fields:     tag: 54_web_catalina_out name: 192.168.1.54 filebeat.config.modules:   path: ${path.config}/modules.d/*.yml   reload.enabled: false setup.template.settings:   index.number_of_shards: 3 output.kafka:   hosts: ["192.168.1.70:9092"]   topic: "filebeat-log"   partition.hash:     reachable_only: true   compression: gzip   max_message_bytes: 1000000   required_acks: 1

[root@localhost filebeat]#

安装完成后去logstash编辑配置文件

logstash操作 [root@localhost logstash]# pwd /home/elk/logstash [root@localhost logstash]# cat dev.conf input {   kafka{     bootstrap_servers => "192.168.1.70:9092"     topics => ["filebeat-log"]     codec => "json"   } } filter {         if [fields][tag]=="jpwebmap" {             json{                 source => "message"                 remove_field => "message"             }             geoip {             source => "client"             target => "geoip"             add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]             add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]             }             mutate {                 convert => [ "[geoip][coordinates]", "float"]                 }         }     if [fields][tag] == "54_tcp_catalina_out"{             grok {                 match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]             }             date {                 match => ["logdate", "ISO8601"]             }             mutate {                 remove_field => [ "logdate" ]             }     }     if [fields][tag] == "54_web_catalina_out"{                 grok {                         match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]                 }                 date {                         match => ["logdate", "ISO8601"]                 }                 mutate {                         remove_field => [ "logdate" ]                 }         }     if [fields][tag] == "55_tcp_catalina_out"{                 grok {                         match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]                 }                 date {                         match => ["logdate", "ISO8601"]                 }                 mutate {                         remove_field => [ "logdate" ]                 }         }         if [fields][tag] == "55_web_catalina_out"{                 grok {                         match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]                 }                 date {                         match => ["logdate", "ISO8601"]                 }                 mutate {                         remove_field => [ "logdate" ]                 }         }     if [fields][tag] == "51_nginx80_access_log" {             mutate {                 add_field => { "spstr" => "%{[log][file][path]}" }             }             mutate {                 split => ["spstr" , "/"]                 # save the last element of the array as the api_method.                 add_field => ["src", "%{[spstr][-1]}" ]             }             mutate{                 remove_field => [ "friends", "ecs", "agent" , "spstr" ]             }             grok {                 match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" }                 remove_field => "message"             }             date {                     match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"]                     target => "@timestamp"             }             geoip {                 source => "x_forwarded_for"                 target => "geoip"                 database => "/home/elk/logstash/GeoLite2-City.mmdb"                 add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]                 add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]             }             mutate {                 convert => [ "[geoip][coordinates]", "float"]             }     } } output { if [fields][tag] == "wori"{   elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "zabbix"       }   } if [fields][tag] == "54_tcp_catalina_out"{   elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "54_tcp_catalina_out"       }   } if [fields][tag] == "54_web_catalina_out"{   elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "54_web_catalina_out"       }   } if [fields][tag] == "55_tcp_catalina_out"{   elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "55_tcp_catalina_out"       }   }  if [fields][tag] == "55_web_catalina_out"{   elasticsearch {   hosts => ["192.168.1.70:9200"]   index => "55_web_catalina_out"       }   } if [fields][tag] == "51_nginx80_access_log" {       stdout{}     elasticsearch {     hosts => ["192.168.1.70:9200"]     index => "51_nginx80_access_log"     }   } }

其他的配置文件

index.conf filter {     mutate {         add_field => { "spstr" => "%{[log][file][path]}" }     }         mutate {         split => ["spstr" , "/"]         # save the last element of the array as the api_method.         add_field => ["src", "%{[spstr][-1]}" ]         }         mutate{     remove_field => [ "friends", "ecs", "agent" , "spstr" ]     } }

Java.conf filter { if [fields][tag] == "java"{     grok {         match => ["message", "%{TIMESTAMP_ISO8601:logdate}"]     }     date {         match => ["logdate", "ISO8601"]     }     mutate {         remove_field => [ "logdate" ]     }   } #End if }

kafkainput.conf input {   kafka{     bootstrap_servers => "172.16.11.68:9092"     #topics => ["ql-prod-tomcat" ]     topics => ["ql-prod-dubbo","ql-prod-nginx","ql-prod-tomcat" ]     codec => "json"     consumer_threads => 5     decorate_events => true     #auto_offset_reset => "latest"     group_id => "logstash"     #client_id => ""     ############################# HELK Optimizing Latency #############################     fetch_min_bytes => "1"     request_timeout_ms => "305000"     ############################# HELK Optimizing Availability #############################     session_timeout_ms => "10000"     max_poll_records => "550"     max_poll_interval_ms => "300000"   }

} #input { #  kafka{ #    bootstrap_servers => "172.16.11.68:9092" #    topics => ["ql-prod-java-dubbo","ql-prod","ql-prod-java" ] #    codec => "json" #    consumer_threads => 15 #    decorate_events => true #    auto_offset_reset => "latest" #    group_id => "logstash-1" #    ############################# HELK Optimizing Latency ############################# #    fetch_min_bytes => "1" #    request_timeout_ms => "305000" #    ############################# HELK Optimizing Availability ############################# #    session_timeout_ms => "10000" #    max_poll_records => "550" #    max_poll_interval_ms => "300000" #  }

#}

nginx.conf filter { if [fields][tag] == "nginx-access" {         mutate {         add_field => { "spstr" => "%{[log][file][path]}" }         }         mutate {         split => ["spstr" , "/"]         # save the last element of the array as the api_method.         add_field => ["src", "%{[spstr][-1]}" ]         }         mutate{         remove_field => [ "friends", "ecs", "agent" , "spstr" ]         }

    grok {         match => { "message" => "%{IPORHOST:remote_addr} - %{DATA:remote_user} \[%{HTTPDATE:time}\] \"%{WORD:method} %{DATA:url} HTTP/%{NUMBER:http_version}\" %{NUMBER:response_code} %{NUMBER:body_sent:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\" \"%{DATA:x_forwarded_for}\" \"%{NUMBER:request_time}\" \"%{DATA:upstream_addr}\" \"%{DATA:upstream_status}\"" }         remove_field => "message"     }     date {                 match => ["time", "dd/MMM/yyyy:HH:mm:ss Z"]                 target => "@timestamp"         }     geoip {         source => "x_forwarded_for"         target => "geoip"         database => "/opt/logstash-6.2.4/GeoLite2-City.mmdb"         add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]         add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]

        }     mutate {         convert => [ "[geoip][coordinates]", "float"]     }

  } #endif }

ouput.conf output{   if [fields][tag] == "nginx-access" {       stdout{}     elasticsearch {     user => elastic     password => WR141bp2sveJuGFaD4oR     hosts => ["172.16.11.67:9200"]     index => "logstash-%{[fields][proname]}-%{+YYYY.MM.dd}"     }   }       #stdout{}   if [fields][tag] == "java" {         elasticsearch {         user => elastic         password => WR141bp2sveJuGFaD4oR         hosts => ["172.16.11.66:9200","172.16.11.68:9200"]         index => "%{[host][name]}-%{[src]}"         }   } }

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档