【ES私房菜】Logstash 安装部署及常用配置

一、概述

Logstash来自ES家族,是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理。

Logstash的数据处理流水线有三个主要角色完成:inputs –> filters –> outputs:

  • inputs:必须,负责产生事件(Inputs generate events),常用:File、syslog、redis、kafka、beats(如:Filebeats) 【拓展阅读
  • filters:可选,负责数据处理与转换(filters modify them),常用:grok、json,mutate、drop、clone、geoip 【拓展阅读
  • outputs:必须,负责数据输出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、statsd、kafka【拓展阅读

二、应用部署

logstash安装启动都非常简单,这里就简单提一下关键步骤:

1、部署jdk环境(略)

2、安装 logstash

①、下载:https://artifacts.elastic.co/downloads/logstash/logstash-5.5.1.tar.gz

②、解压

③、启动

nohup ./bin/logstash -f config/logstash.conf >/dev/null 2>&1 & 

三、常用input配置:

1、读日志文件:

input
  file {
    path => ["/var/log/*.log", "/var/log/message"]
    type => "system"
    start_position => "beginning"
  }
}

2、监听beat数据

input {
    beats {
        port => 5044
    }
}

3、读取redis数据

input {
  redis {
    host => "127.0.0.1"
    port => 6379
    data_type => "list"
    key => "logstash-list"
  }
}

4、读取kafka数据

input {
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "messages"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "messages" }
    }
   kafka {
        bootstrap_servers =>  "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "mysql_slow_log"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "mysql_slow_log" }
    }
}

网管这边主要用到kafka,这里着重介绍下:

  • 多模块:如果有多个日志源,即存在多个topics,那么可以写多个kafka模块
  • topics:每个kafka模块指定对应的topics用于读取指定日志
  • group_id:多个logstash应指定为同样的group_id,表示是同一类consumer,避免数据重复
  • codec:kafka存在的都是json数据,因此这里指定为json解码
  • add_field:这里给不同类型数据加上区分字段,后面的filter和output将会用到

更多介绍:https://www.elastic.co/guide/en/logstash/5.5/input-plugins.html

四、常用filter配置

1、MySQL慢日志:

filter {
    # 这里就用到前文提到的区分字段了:
    if [@metadata][type] == "mysql_slow_log" {
        grok {
            # 正则匹配(不同MySQL版本慢日志格式可能不通,此处适用于MySQL 5.5+版本)
            match => [ "message", "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+\[(?:%{IP:client_ip})?\]\s*\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time
:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<query>(?<a
ction>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$"]
            # 慢日志里面的主机IP为主机名,因此这里变相处理下,加入server_ip字段,值为beatname指定的值
            add_field => [ "server_ip", "%{[beat][name]}" ]
            # 匹配到了就加入标签
            add_tag => [ "matched" ]
        }
        # 未匹配的数据直接drop
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
           # 这里对慢日志的时间戳进行格式转换
            match => [ "timestamp", "UNIX","YYYY-MM-dd HH:mm:ss"]
            remove_field => [ "timestamp" ]
        }
        # 此处对SQL进行MD5运算,并存到fingerprint字段,用于区分同一条SQL
        mutate {
            add_field => {"sql_hash" => "%{query}"}
            gsub => [
                "sql_hash", "'.+?'", "",
                "sql_hash", "-?\d*\.{0,1}\d+", ""
              ]
        }
        fingerprint {
                method => "MD5"
                key => ["sql_hash"]
        }
        # 移除不需要的字段
        mutate {
            remove_field => "sql_hash"      
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "message"
        } 
     }
}

有用的说明都在注释中,仔细看看吧~

2、WEB访问日志

filter {
    # 只处理标签为web_access_log的数据
    if [@metadata][type] == "web_access_log" {
        
        # 为了兼容中文路径,这里做了下数据替换
        mutate {  
          gsub => ["message", "\\x", "\\\x"]
      }

      # 排除HEAD请求
      if ( 'method":"HEAD' in [message] ) {
           drop {}
       }

     # Nginx、Apache已经将日志格式定制为json,所以简单处理即可
      json {
            # 从数据中取出message
            source => "message"
            # 删除多余字段
            remove_field => "message"          
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }
    }
}
 

3、系统日志

大同小异,就不做注释了。

filter {
    if [@metadata][type] == "messages" {
        grok {
              match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
              add_field => [ "ip", "%{[beat][name]}" ]
              add_tag => [ "matched" ]
        }
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            locale => "en_US"
            timezone => "Asia/Shanghai"
            match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
            target => "@timestamp"
        }

        ruby {
            code => "event['@timestamp'] = event['@timestamp'].getlocal"
        }
        mutate {
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }  
    }

更多介绍:https://www.elastic.co/guide/en/logstash/5.5/filter-plugins.html

四、常用output设置

1、直接打屏(DEBUG)

output {
    stdout{
        codec => rubydebug
    }
}

2、上报ES

output {
    elasticsearch {
        hosts => ["x.x.x.1:9200","x.x.x.2:9200","x.x.x.3:9200"]
        # 指定索引,按时间分表
        index => "logstash-%{type}-%{+yyyy.MM.dd}"
        # 由logstash控制ES模板
        template_overwrite => true
    }
}

3、上报kafka

output{
    kafka{
        topic_id => "hello"
        bootstrap_servers => "x.x.x.x:9092" # kafka的地址
        batch_size => 5
    }
}

4、网管侧的配置示例:

output {
    # 上报系统日志
    if [@metadata][type] == "messages" {   
        elasticsearch {
            hosts => ["eshost:9200"]
            index => "messages-%{+YYYY.MM.dd}"
            # logstash不管理模块,使用ES已保存模板
            manage_template => false
            template_name => "template-messages"
        }
    }
    #上报MySQL慢日志
    if [@metadata][type] == "mysql_slow_log" {
        elasticsearch {
            hosts => ["eshost:9200"]
            index => "mysqlslowlog-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-mysqlslowlog"
        }
    }
    # 上报WEB日志
    if [@metadata][type] == "web_access_log" {
        elasticsearch {
            hosts => ["eshost:9200"]
            index => "web_access_log-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-web_access_log"
        }
    }
}

更多介绍:https://www.elastic.co/guide/en/logstash/5.5/output-plugins.html

五、附录

最后附上网管侧完整logstash配置,仅供参考:

input {
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "messages"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "messages" }
    }
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "mysql_slow_log"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "mysql_slow_log" }
    }
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "web_access_log"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "web_access_log" }
    }
}

filter {
    if [@metadata][type] == "messages" {
        grok {
              match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
              add_field => [ "ip", "%{[beat][name]}" ]
              add_tag => [ "matched" ]
        }
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            locale => "en_US"
            timezone => "Asia/Shanghai"
            match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
            target => "@timestamp"
        }

        ruby {
            code => "event['@timestamp'] = event['@timestamp'].getlocal"
        }
        mutate {
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }  
    }

    if [@metadata][type] == "mysql_slow_log" {
        grok {
            match => [ "message", "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+\[(?:%{IP:client_ip})?\]\s*\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time
:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<query>(?<a
ction>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$"]
            add_field => [ "server_ip", "%{[beat][name]}" ]
            add_tag => [ "matched" ]
        }
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            match => [ "timestamp", "UNIX","YYYY-MM-dd HH:mm:ss"]
            remove_field => [ "timestamp" ]
        }
        mutate {
            add_field => {"sql_hash" => "%{query}"}
            gsub => [
                "sql_hash", "'.+?'", "",
                "sql_hash", "-?\d*\.{0,1}\d+", ""
              ]
        }
        fingerprint {
                method => "MD5"
                key => ["sql_hash"]
        }
        mutate {
            remove_field => "sql_hash"      
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "message"
        } 
     }

    if [@metadata][type] == "web_access_log" {
      mutate {  
        gsub => ["message", "\\x", "\\\x"]
      }
      json {
            source => "message"
            remove_field => "message"          
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }
    }
}

output {
    if [@metadata][type] == "messages" {   
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "messages-%{+YYYY.MM.dd}"
            manage_template => false
             template_name => "template-messages"
        }
    }
    if [@metadata][type] == "mysql_slow_log" {
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "mysqlslowlog-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-mysqlslowlog"
        }
    }
    if [@metadata][type] == "web_access_log" {
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "web_access_log-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-web_access_log"
        }
    }
}

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏互扯程序

跨域请求方案 终极版

现在是资源共享的时代,同样也是知识分享的时代,如果你觉得本文能学到知识,请把知识与别人分享。

1653
来自专栏电光石火

tengine+tomcat+php安装

在安装tengine之前,确认centos环境中有无gcc、pcre、openssl,如果没有按以下命令进行安装 #yum install gcc #yu...

18610
来自专栏IT笔记

log4j配置邮件发送

log4j为java提供记录日志文件的包,提供了多种方式记录,包括终端(Console),文件(File),数据库(JDBC),邮件(SMTP)等等。 本来系统...

3315
来自专栏我的博客

Redis安装配置以及开机启动

1.安装Redis 下载地址http://redis.io/download wget http://download.redis.io/releases/...

2855
来自专栏Samego开发资源

php5-fpm一直无法监听9000端口 | Nginx配置TP5

1824
来自专栏白驹过隙

RabbitMQ - TcpConnection析构引发的一次handshake_timeout

36113
来自专栏Web 开发

Web前端安全学习-CSRF

在数据库有了一层安全保护之后,攻击者们的目标,从服务器转移到了用户身上。由此,出现了CSRF攻击和XSS攻击。

690
来自专栏smy

SAE上传文件到storage

还有什么比代码更清晰的讲解 html代码: 一定需要下面这个: method="post" enctype="multipart/form-data" <h...

3429
来自专栏崔庆才的专栏

Scrapy框架的使用之Scrapy框架介绍

1324
来自专栏大魏分享(微信公众号:david-share)

本地&远程访问一个EJB | 从开发角度看应用架构4

1152

扫码关注云+社区