前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【ES私房菜】Logstash 安装部署及常用配置

【ES私房菜】Logstash 安装部署及常用配置

原创
作者头像
张戈
修改2017-10-12 16:33:40
4.9K0
修改2017-10-12 16:33:40
举报
文章被收录于专栏:张戈的专栏张戈的专栏

一、概述

Logstash来自ES家族,是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理。

Logstash的数据处理流水线有三个主要角色完成:inputs –> filters –> outputs:

[1506391998695_106_1506391993161.png]
[1506391998695_106_1506391993161.png]
  • inputs:必须,负责产生事件(Inputs generate events),常用:File、syslog、redis、kafka、beats(如:Filebeats) 【拓展阅读
  • filters:可选,负责数据处理与转换(filters modify them),常用:grok、json,mutate、drop、clone、geoip 【拓展阅读
  • outputs:必须,负责数据输出(outputs ship them elsewhere),常用:elasticsearch、file、graphite、statsd、kafka【拓展阅读

二、应用部署

logstash安装启动都非常简单,这里就简单提一下关键步骤:

1、部署jdk环境(略)

2、安装 logstash

①、下载:https://artifacts.elastic.co/downloads/logstash/logstash-5.5.1.tar.gz

②、解压

③、启动

nohup ./bin/logstash -f config/logstash.conf >/dev/null 2>&1 & 

三、常用input配置:

1、读日志文件:

input
  file {
    path => ["/var/log/*.log", "/var/log/message"]
    type => "system"
    start_position => "beginning"
  }
}

2、监听beat数据

input {
    beats {
        port => 5044
    }
}

3、读取redis数据

input {
  redis {
    host => "127.0.0.1"
    port => 6379
    data_type => "list"
    key => "logstash-list"
  }
}

4、读取kafka数据

input {
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "messages"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "messages" }
    }
   kafka {
        bootstrap_servers =>  "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "mysql_slow_log"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "mysql_slow_log" }
    }
}

网管这边主要用到kafka,这里着重介绍下:

  • 多模块:如果有多个日志源,即存在多个topics,那么可以写多个kafka模块
  • topics:每个kafka模块指定对应的topics用于读取指定日志
  • group_id:多个logstash应指定为同样的group_id,表示是同一类consumer,避免数据重复
  • codec:kafka存在的都是json数据,因此这里指定为json解码
  • add_field:这里给不同类型数据加上区分字段,后面的filter和output将会用到

更多介绍:https://www.elastic.co/guide/en/logstash/5.5/input-plugins.html

四、常用filter配置

1、MySQL慢日志:

filter {
    # 这里就用到前文提到的区分字段了:
    if [@metadata][type] == "mysql_slow_log" {
        grok {
            # 正则匹配(不同MySQL版本慢日志格式可能不通,此处适用于MySQL 5.5+版本)
            match => [ "message", "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+\[(?:%{IP:client_ip})?\]\s*\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time
:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<query>(?<a
ction>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$"]
            # 慢日志里面的主机IP为主机名,因此这里变相处理下,加入server_ip字段,值为beatname指定的值
            add_field => [ "server_ip", "%{[beat][name]}" ]
            # 匹配到了就加入标签
            add_tag => [ "matched" ]
        }
        # 未匹配的数据直接drop
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
           # 这里对慢日志的时间戳进行格式转换
            match => [ "timestamp", "UNIX","YYYY-MM-dd HH:mm:ss"]
            remove_field => [ "timestamp" ]
        }
        # 此处对SQL进行MD5运算,并存到fingerprint字段,用于区分同一条SQL
        mutate {
            add_field => {"sql_hash" => "%{query}"}
            gsub => [
                "sql_hash", "'.+?'", "",
                "sql_hash", "-?\d*\.{0,1}\d+", ""
              ]
        }
        fingerprint {
                method => "MD5"
                key => ["sql_hash"]
        }
        # 移除不需要的字段
        mutate {
            remove_field => "sql_hash"      
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "message"
        } 
     }
}

有用的说明都在注释中,仔细看看吧~

2、WEB访问日志

filter {
    # 只处理标签为web_access_log的数据
    if [@metadata][type] == "web_access_log" {
        
        # 为了兼容中文路径,这里做了下数据替换
        mutate {  
          gsub => ["message", "\\x", "\\\x"]
      }

      # 排除HEAD请求
      if ( 'method":"HEAD' in [message] ) {
           drop {}
       }

     # Nginx、Apache已经将日志格式定制为json,所以简单处理即可
      json {
            # 从数据中取出message
            source => "message"
            # 删除多余字段
            remove_field => "message"          
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }
    }
}
 

3、系统日志

大同小异,就不做注释了。

filter {
    if [@metadata][type] == "messages" {
        grok {
              match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
              add_field => [ "ip", "%{[beat][name]}" ]
              add_tag => [ "matched" ]
        }
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            locale => "en_US"
            timezone => "Asia/Shanghai"
            match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
            target => "@timestamp"
        }

        ruby {
            code => "event['@timestamp'] = event['@timestamp'].getlocal"
        }
        mutate {
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }  
    }

更多介绍:https://www.elastic.co/guide/en/logstash/5.5/filter-plugins.html

四、常用output设置

1、直接打屏(DEBUG)

output {
    stdout{
        codec => rubydebug
    }
}

2、上报ES

output {
    elasticsearch {
        hosts => ["x.x.x.1:9200","x.x.x.2:9200","x.x.x.3:9200"]
        # 指定索引,按时间分表
        index => "logstash-%{type}-%{+yyyy.MM.dd}"
        # 由logstash控制ES模板
        template_overwrite => true
    }
}

3、上报kafka

output{
    kafka{
        topic_id => "hello"
        bootstrap_servers => "x.x.x.x:9092" # kafka的地址
        batch_size => 5
    }
}

4、网管侧的配置示例:

output {
    # 上报系统日志
    if [@metadata][type] == "messages" {   
        elasticsearch {
            hosts => ["eshost:9200"]
            index => "messages-%{+YYYY.MM.dd}"
            # logstash不管理模块,使用ES已保存模板
            manage_template => false
            template_name => "template-messages"
        }
    }
    #上报MySQL慢日志
    if [@metadata][type] == "mysql_slow_log" {
        elasticsearch {
            hosts => ["eshost:9200"]
            index => "mysqlslowlog-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-mysqlslowlog"
        }
    }
    # 上报WEB日志
    if [@metadata][type] == "web_access_log" {
        elasticsearch {
            hosts => ["eshost:9200"]
            index => "web_access_log-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-web_access_log"
        }
    }
}

更多介绍:https://www.elastic.co/guide/en/logstash/5.5/output-plugins.html

五、附录

最后附上网管侧完整logstash配置,仅供参考:

input {
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "messages"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "messages" }
    }
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "mysql_slow_log"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "mysql_slow_log" }
    }
    kafka {
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        topics => "web_access_log"
        group_id => "logstash"
        codec => json {
            charset => "UTF-8"
        }
        add_field => { "[@metadata][type]" => "web_access_log" }
    }
}

filter {
    if [@metadata][type] == "messages" {
        grok {
              match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
              add_field => [ "ip", "%{[beat][name]}" ]
              add_tag => [ "matched" ]
        }
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            locale => "en_US"
            timezone => "Asia/Shanghai"
            match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
            target => "@timestamp"
        }

        ruby {
            code => "event['@timestamp'] = event['@timestamp'].getlocal"
        }
        mutate {
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }  
    }

    if [@metadata][type] == "mysql_slow_log" {
        grok {
            match => [ "message", "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+\[(?:%{IP:client_ip})?\]\s*\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time
:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<query>(?<a
ction>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$"]
            add_field => [ "server_ip", "%{[beat][name]}" ]
            add_tag => [ "matched" ]
        }
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            match => [ "timestamp", "UNIX","YYYY-MM-dd HH:mm:ss"]
            remove_field => [ "timestamp" ]
        }
        mutate {
            add_field => {"sql_hash" => "%{query}"}
            gsub => [
                "sql_hash", "'.+?'", "",
                "sql_hash", "-?\d*\.{0,1}\d+", ""
              ]
        }
        fingerprint {
                method => "MD5"
                key => ["sql_hash"]
        }
        mutate {
            remove_field => "sql_hash"      
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "message"
        } 
     }

    if [@metadata][type] == "web_access_log" {
      mutate {  
        gsub => ["message", "\\x", "\\\x"]
      }
      json {
            source => "message"
            remove_field => "message"          
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }
    }
}

output {
    if [@metadata][type] == "messages" {   
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "messages-%{+YYYY.MM.dd}"
            manage_template => false
             template_name => "template-messages"
        }
    }
    if [@metadata][type] == "mysql_slow_log" {
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "mysqlslowlog-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-mysqlslowlog"
        }
    }
    if [@metadata][type] == "web_access_log" {
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "web_access_log-%{+YYYY.MM.dd}"
            manage_template => false
            template_name => "template-web_access_log"
        }
    }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概述
  • 二、应用部署
    • 1、部署jdk环境(略)
      • 2、安装 logstash
      • 三、常用input配置:
        • 1、读日志文件:
          • 2、监听beat数据
            • 3、读取redis数据
              • 4、读取kafka数据
              • 四、常用filter配置
                • 1、MySQL慢日志:
                  • 2、WEB访问日志
                    • 3、系统日志
                    • 四、常用output设置
                      • 1、直接打屏(DEBUG)
                        • 2、上报ES
                          • 3、上报kafka
                            • 4、网管侧的配置示例:
                            • 五、附录
                            相关产品与服务
                            Elasticsearch Service
                            腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档