【ES私房菜】Logstash 安装部署及常用配置

一、概述

Logstash来自ES家族,是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理。

Logstash的数据处理流水线有三个主要角色完成:inputs –> filters –> outputs:

  • inputs:必须,负责产生事件(Inputs generate events),常用:File、syslog、redis、kafka、beats(如:Filebeats) 【拓展阅读
  • filters:可选,负责数据处理与转换(filters modify them),常用:grok、json,mutate、drop、clone、geoip 【拓展阅读
  • outputs:必须,负责数据输出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、statsd、kafka【拓展阅读

二、应用部署

logstash安装启动都非常简单,这里就简单提一下关键步骤:

1、部署jdk环境(略)

2、安装 logstash

①、下载:https://artifacts.elastic.co/downloads/logstash/logstash-5.5.1.tar.gz

②、解压

③、启动

nohup ./bin/logstash -f config/logstash.conf >/dev/null 2>&1 & 

三、常用input配置:

1、读日志文件:

input
  file {
    path => ["/var/log/*.log", "/var/log/message"]
    type => "system"
    start_position => "beginning"
  }
}

2、监听beat数据

input {
    beats {
        port => 5044
    }
}

3、读取redis数据

input {
  redis {
    host => "127.0.0.1"
    port => 6379
    data_type => "list"
    key => "logstash-list"
  }
}

4、读取kafka数据

input {
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "messages"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "messages" }
    }
   kafka {
        bootstrap_servers =>  "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "mysql_slow_log"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "mysql_slow_log" }
    }
}

网管这边主要用到kafka,这里着重介绍下:

  • 多模块:如果有多个日志源,即存在多个topics,那么可以写多个kafka模块
  • topics:每个kafka模块指定对应的topics用于读取指定日志
  • group_id:多个logstash应指定为同样的group_id,表示是同一类consumer,避免数据重复
  • codec:kafka存在的都是json数据,因此这里指定为json解码
  • add_field:这里给不同类型数据加上区分字段,后面的filter和output将会用到

更多介绍:https://www.elastic.co/guide/en/logstash/5.5/input-plugins.html

四、常用filter配置

1、MySQL慢日志:

filter {
    # 这里就用到前文提到的区分字段了:
    if [@metadata][type] == "mysql_slow_log" {
        grok {
            # 正则匹配(不同MySQL版本慢日志格式可能不通,此处适用于MySQL 5.5+版本)
            match => [ "message", "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+\[(?:%{IP:client_ip})?\]\s*\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time
:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<query>(?<a
ction>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$"]
            # 慢日志里面的主机IP为主机名,因此这里变相处理下,加入server_ip字段,值为beatname指定的值
            add_field => [ "server_ip", "%{[beat][name]}" ]
            # 匹配到了就加入标签
            add_tag => [ "matched" ]
        }
        # 未匹配的数据直接drop
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
           # 这里对慢日志的时间戳进行格式转换
            match => [ "timestamp", "UNIX","YYYY-MM-dd HH:mm:ss"]
            remove_field => [ "timestamp" ]
        }
        # 此处对SQL进行MD5运算,并存到fingerprint字段,用于区分同一条SQL
        mutate {
            add_field => {"sql_hash" => "%{query}"}
            gsub => [
                "sql_hash", "'.+?'", "",
                "sql_hash", "-?\d*\.{0,1}\d+", ""
              ]
        }
        fingerprint {
                method => "MD5"
                key => ["sql_hash"]
        }
        # 移除不需要的字段
        mutate {
            remove_field => "sql_hash"      
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "message"
        } 
     }
}

有用的说明都在注释中,仔细看看吧~

2、WEB访问日志

filter {
    # 只处理标签为web_access_log的数据
    if [@metadata][type] == "web_access_log" {
        
        # 为了兼容中文路径,这里做了下数据替换
        mutate {  
          gsub => ["message", "\\x", "\\\x"]
      }

      # 排除HEAD请求
      if ( 'method":"HEAD' in [message] ) {
           drop {}
       }

     # Nginx、Apache已经将日志格式定制为json,所以简单处理即可
      json {
            # 从数据中取出message
            source => "message"
            # 删除多余字段
            remove_field => "message"          
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }
    }
}
 

3、系统日志

大同小异,就不做注释了。

filter {
    if [@metadata][type] == "messages" {
        grok {
              match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
              add_field => [ "ip", "%{[beat][name]}" ]
              add_tag => [ "matched" ]
        }
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            locale => "en_US"
            timezone => "Asia/Shanghai"
            match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
            target => "@timestamp"
        }

        ruby {
            code => "event['@timestamp'] = event['@timestamp'].getlocal"
        }
        mutate {
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }  
    }

更多介绍:https://www.elastic.co/guide/en/logstash/5.5/filter-plugins.html

四、常用output设置

1、直接打屏(DEBUG)

output {
    stdout{
        codec => rubydebug
    }
}

2、上报ES

output {
    elasticsearch {
        hosts => ["x.x.x.1:9200","x.x.x.2:9200","x.x.x.3:9200"]
        # 指定索引,按时间分表
        index => "logstash-%{type}-%{+yyyy.MM.dd}"
        # 由logstash控制ES模板
        template_overwrite => true
    }
}

3、上报kafka

output{
    kafka{
        topic_id => "hello"
        bootstrap_servers => "x.x.x.x:9092" # kafka的地址
        batch_size => 5
    }
}

4、网管侧的配置示例:

output {
    # 上报系统日志
    if [@metadata][type] == "messages" {   
        elasticsearch {
            hosts => ["eshost:9200"]
            index => "messages-%{+YYYY.MM.dd}"
            # logstash不管理模块,使用ES已保存模板
            manage_template => false
            template_name => "template-messages"
        }
    }
    #上报MySQL慢日志
    if [@metadata][type] == "mysql_slow_log" {
        elasticsearch {
            hosts => ["eshost:9200"]
            index => "mysqlslowlog-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-mysqlslowlog"
        }
    }
    # 上报WEB日志
    if [@metadata][type] == "web_access_log" {
        elasticsearch {
            hosts => ["eshost:9200"]
            index => "web_access_log-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-web_access_log"
        }
    }
}

更多介绍:https://www.elastic.co/guide/en/logstash/5.5/output-plugins.html

五、附录

最后附上网管侧完整logstash配置,仅供参考:

input {
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "messages"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "messages" }
    }
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "mysql_slow_log"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "mysql_slow_log" }
    }
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "web_access_log"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "web_access_log" }
    }
}

filter {
    if [@metadata][type] == "messages" {
        grok {
              match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
              add_field => [ "ip", "%{[beat][name]}" ]
              add_tag => [ "matched" ]
        }
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            locale => "en_US"
            timezone => "Asia/Shanghai"
            match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
            target => "@timestamp"
        }

        ruby {
            code => "event['@timestamp'] = event['@timestamp'].getlocal"
        }
        mutate {
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }  
    }

    if [@metadata][type] == "mysql_slow_log" {
        grok {
            match => [ "message", "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+\[(?:%{IP:client_ip})?\]\s*\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time
:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<query>(?<a
ction>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$"]
            add_field => [ "server_ip", "%{[beat][name]}" ]
            add_tag => [ "matched" ]
        }
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            match => [ "timestamp", "UNIX","YYYY-MM-dd HH:mm:ss"]
            remove_field => [ "timestamp" ]
        }
        mutate {
            add_field => {"sql_hash" => "%{query}"}
            gsub => [
                "sql_hash", "'.+?'", "",
                "sql_hash", "-?\d*\.{0,1}\d+", ""
              ]
        }
        fingerprint {
                method => "MD5"
                key => ["sql_hash"]
        }
        mutate {
            remove_field => "sql_hash"      
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "message"
        } 
     }

    if [@metadata][type] == "web_access_log" {
      mutate {  
        gsub => ["message", "\\x", "\\\x"]
      }
      json {
            source => "message"
            remove_field => "message"          
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }
    }
}

output {
    if [@metadata][type] == "messages" {   
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "messages-%{+YYYY.MM.dd}"
            manage_template => false
             template_name => "template-messages"
        }
    }
    if [@metadata][type] == "mysql_slow_log" {
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "mysqlslowlog-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-mysqlslowlog"
        }
    }
    if [@metadata][type] == "web_access_log" {
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "web_access_log-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-web_access_log"
        }
    }
}

原创声明,本文系作者授权云+社区-专栏发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏沈唁志

简单配置.htaccess就可以实现的几个功能

1724
来自专栏杨建荣的学习笔记

从零开始串联Python前后端技术

运维开发流程概述 是我们参与到其中的一个入口,我们需要了解运维开发的一些环节,还有运维开发的一些技术基础。我们通过一个实例来做演示,基本的需求就是从数据库中查...

2826
来自专栏Porschev[钟慰]的专栏

Nodejs学习笔记(七)--- Node.js + Express 构建网站简单示例

前言   上一篇学习了一些构建网站会用到的一些知识点 https://cloud.tencent.com/developer/article/1020636  ...

4178
来自专栏Alice

ios9 http请求失败的问题

最近做项目的时候 将电脑版本升级到10.11.3  xcode'升级到 7.2  但是在模拟器上边进行数据请求的时候告诉我说网路哦有问题 截图如下 ? 通过网络...

1977
来自专栏安全

phpMyAdmin 4.8.x(最新版) 本地文件包含漏洞利用

本文转载自:phpMyAdmin 4.8.x LFI Exploit -http://blog.vulnspy.com/2018/06/21/phpMyAdmi...

3516
来自专栏.NET技术

Visual Studio Package 插件开发之自动生成实体工具

  这一篇是VS插件基于Visual Studio SDK扩展开发的,可能有些朋友看到【生成实体】心里可能会暗想,T4模板都可以做了、动软不是已经做了么、不就是...

793
来自专栏JetpropelledSnake

Django学习笔记之利用Form和Ajax实现注册功能

1195
来自专栏大内老A

[ASP.NET Web API]如何Host定义在独立程序集中的Controller

通过《ASP.NET Web API的Controller是如何被创建的?》的介绍我们知道默认ASP.NET Web API在Self Host寄宿模式下用于...

1798
来自专栏北京马哥教育

Python框架:Django写图书管理系统(LMS)

今天我会带大家真正写一个Django项目,对于入门来说是有点难度的,因为逻辑比较复杂,但是真正的知识就是函数与面向对象,这也是培养用Django思维写项目的开始

700
来自专栏V站

.htaccess文件用法收集整理

有些时候,当你在PHP里使用date或mktime函数时,由于时区的不同,它会显示出一些很奇怪的信息。下面是解决这个问题的方法之一。就是设置你的服务器的时区。你...

1792

扫码关注云+社区