前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【ES私房菜】收集 Linux 系统日志

【ES私房菜】收集 Linux 系统日志

原创
作者头像
张戈
修改2017-10-12 16:33:43
4.1K0
修改2017-10-12 16:33:43
举报
文章被收录于专栏:张戈的专栏张戈的专栏

ES环境已经就绪,我们需要先调通一类简单日志作为数据上报全链路的试运行。恰好Linux系统日志就是这样一个角色,随处可见,风险可控。

这里,我们选择了2种Linux系统日志:/var/log/messages 和 /var/log/cron。

一、部署Filebeat

按照《【ES私房菜】Filebeat安装部署及配置详解》在需要采集系统日志的系统上部署filebeat,然后编写如下配置:

vim filebeat.yml

代码语言:txt
复制
############################# input #########################################
filebeat.prospectors:
- input_type: log
  paths: /var/log/messages
  paths: /var/log/cron
  document_type: "messages"

spool_size: 1024
idle_timeout: "5s"
name: 172.16.x.xxx

############################# Kafka #########################################
output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["x.x.x.1:9092","x.x.x.2:9092","x.x.x.3:9092"]
  # message topic selection + partitioning
  topic: '%{[type]}'
  flush_interval: 1s
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

############################# Logging #########################################
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
  path: /data/filebeat/logs
  name: filebeat.log
  keepfiles: 7

############################# console #########################################
#For Debug
output.console:
  # Boolean flag to enable or disable the output module.
  enabled: true
  # Pretty print json event
  pretty: true

这里我们先把 console 输出打开,然后启动filebeat观察输出:

代码语言:txt
复制
{
  "@timestamp": "2017-09-15T01:07:02.931Z",
  "beat": {
    "hostname": "TENCENT64.site",
    "name": "172.16.x.xxx",
    "version": "5.5.1"
  },
  "input_type": "log",
  "message": "Sep 15 09:07:01 TENCENT64 CROND[815]: (root) CMD (/usr/local/sa/agent/secu-tcs-agent-mon-safe.sh  \u003e /dev/null 2\u003e\u00261)",
  "offset": 12991588,
  "source": "/var/log/cron",
  "type": "messages"
}

可以看到输出了如上的json格式数据,这些数据会原样上报kafka或logstash。

二、配置template

在数据上报ES之前,我们还需要先配置下相应的ES template。

当然,如果不想配置也可以,在后文的 logstash 的 output 模块中去掉【禁止管理模板的配置】即可,也就是让 logstash 自适应导入 template。

但是这样有个弊端,因为 logstash 是自适应匹配模板,可能有一些字段类型就不是那么准确,导致我们后面在Kibana里面就无法对一些字段进行聚合分析了。

根据messages字段内容,编写如下template:

代码语言:txt
复制
{
    "template": "messages-*",
    "mappings": {
      "log": {
        "properties": {
          "@timestamp": {
            "include_in_all" : false,
            "type" : "date"
          },
          "ip": {
            "index": "not_analyzed",
            "type": "ip"
          },
          "hostname": {
            "index": "not_analyzed",
            "type": "string"
          },
          "message": {
            "type": "string"
          },
          "message_content": {
            "type": "string"
          },
          "message_pid": {
            "type": "string"
          },
          "message_program": {
            "index": "not_analyzed",
            "type": "string"
          },
          "message_timestramp": {
           "include_in_all" : false,
            "type" : "date"
          }
        }
      }
    }
}

Ps:这里就不详细说明字段的含义了,请参考系列文章《ElastiSearch template简介(整理中)》.

将上述模板保存为 messages.json 的文件,然后执行如下命令进行导入:

代码语言:txt
复制
curl -XPUT http://x.x.x.x:9200/_template/template-messages -d @messages.json
  • 主机为ES地址和端口
  • _template 表示模板方法
  • template-messages 是我们给这个模板定义的名字
  • -d @模板文件,表示将这个模板文件导入到ES

正常将返回如下结果:

代码语言:txt
复制
{
  "acknowledged" : true
}

三、配置logstash

vim logstash.conf

代码语言:txt
复制
input {
    kafka {
        # 指定kafka服务IP和端口(老版本是指定zookeeper服务,这里有所区别)
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        # 取出 messages 数据
        topics => "messages"
        # 指定多个logstash为相同的 group_id,协同工作
        group_id => "logstash"
        # kafka 取出来的数据需要格式化为json
        codec => json {
            charset => "UTF-8"
        }
        # 加一个标签字段,用于后面判断
        add_field => { "[@metadata][type]" => "messages" }
    }
}
filter {
    # 根据标签来处理messages数据
    if [@metadata][type] == "messages" {
        grok {
              # 正则匹配
              match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
              # 亮点:由于源消息是没有上报IP的(只有主机名)这里做了点改造,将filebeat传来的name赋值给ip这个字段
              add_field => [ "ip", "%{[beat][name]}" ]
              # 如果成功匹配,则加一个匹配标签
              add_tag => [ "matched" ]
        }
        # 如果标签不存在则丢弃这条消息
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            # 对日志里面的时间戳进行格式转换,适配ES的时间格式
            locale => "en_US"
            timezone => "Asia/Shanghai"
            match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
            target => "@timestamp"
        }
        ruby {
            code => "event['@timestamp'] = event['@timestamp'].getlocal"
        }
        mutate {
            # 移除不需要的字段
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }  
    }
output {
    # 上报系统日志
    if [@metadata][type] == "messages" {   
        # stdout { codec => rubydebug } # 信息打屏,用于DEBUG
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "messages-%{+YYYY.MM.dd}"
            # 禁止logstash管理模板,并指定上文上传的模板
            manage_template => false
            template_name => "template-messages"
        }
    }
}

因为数据格式的不同,grok正则表达式也要做相应的适配,这个过程应该是配置logstash最麻烦的地方。当然,这里我们可以用到 grok 的在线测试工具:http://grokdebug.herokuapp.com/

同样的,我们在配置完logstash之后,先屏蔽 elasticsearch 的上报,然后打开stdout:

代码语言:txt
复制
stdout { codec => rubydebug }

进行调试,直到打屏数据满足我们的需求后再上报至ES。

四、配置Kibana

有了数据之后,首次打开Kibana发现啥也看不到,此时我们需要先配置下Kibana的index,图解如下:

①、如图打开索引管理:

[1506566531816_2063_1506566523287.png]
[1506566531816_2063_1506566523287.png]

②、如图点击创建索引:

[1506566545527_1359_1506566536920.png]
[1506566545527_1359_1506566536920.png]

③、如图输入logstash指定的索引前缀,自动带出字段后选择时间戳字段,点击【Create】即可:

[1506566564970_6945_1506566556374.png]
[1506566564970_6945_1506566556374.png]

最后,回到Discover界面就能看到期待已久的高清美图了:

[image.png]
[image.png]

本文就介绍这么多,更多Kibana的奇淫巧计请关注《ES私房菜系列文章之教你玩转Kibana(整理中)》。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、部署Filebeat
  • 二、配置template
  • 三、配置logstash
  • 四、配置Kibana
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档