【ES私房菜】收集 Linux 系统日志

ES环境已经就绪,我们需要先调通一类简单日志作为数据上报全链路的试运行。恰好Linux系统日志就是这样一个角色,随处可见,风险可控。

这里,我们选择了2种Linux系统日志:/var/log/messages 和 /var/log/cron。

一、部署Filebeat

按照《【ES私房菜】Filebeat安装部署及配置详解》在需要采集系统日志的系统上部署filebeat,然后编写如下配置:

vim filebeat.yml

############################# input #########################################
filebeat.prospectors:
- input_type: log
  paths: /var/log/messages
  paths: /var/log/cron
  document_type: "messages"

spool_size: 1024
idle_timeout: "5s"
name: 172.16.x.xxx

############################# Kafka #########################################
output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["x.x.x.1:9092","x.x.x.2:9092","x.x.x.3:9092"]
  # message topic selection + partitioning
  topic: '%{[type]}'
  flush_interval: 1s
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

############################# Logging #########################################
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
  path: /data/filebeat/logs
  name: filebeat.log
  keepfiles: 7

############################# console #########################################
#For Debug
output.console:
  # Boolean flag to enable or disable the output module.
  enabled: true
  # Pretty print json event
  pretty: true

这里我们先把 console 输出打开,然后启动filebeat观察输出:

{
  "@timestamp": "2017-09-15T01:07:02.931Z",
  "beat": {
    "hostname": "TENCENT64.site",
    "name": "172.16.x.xxx",
    "version": "5.5.1"
  },
  "input_type": "log",
  "message": "Sep 15 09:07:01 TENCENT64 CROND[815]: (root) CMD (/usr/local/sa/agent/secu-tcs-agent-mon-safe.sh  \u003e /dev/null 2\u003e\u00261)",
  "offset": 12991588,
  "source": "/var/log/cron",
  "type": "messages"
}

可以看到输出了如上的json格式数据,这些数据会原样上报kafka或logstash。

二、配置template

在数据上报ES之前,我们还需要先配置下相应的ES template。

当然,如果不想配置也可以,在后文的 logstash 的 output 模块中去掉【禁止管理模板的配置】即可,也就是让 logstash 自适应导入 template。

但是这样有个弊端,因为 logstash 是自适应匹配模板,可能有一些字段类型就不是那么准确,导致我们后面在Kibana里面就无法对一些字段进行聚合分析了。

根据messages字段内容,编写如下template:

{
    "template": "messages-*",
    "mappings": {
      "log": {
        "properties": {
          "@timestamp": {
            "include_in_all" : false,
            "type" : "date"
          },
          "ip": {
            "index": "not_analyzed",
            "type": "ip"
          },
          "hostname": {
            "index": "not_analyzed",
            "type": "string"
          },
          "message": {
            "type": "string"
          },
          "message_content": {
            "type": "string"
          },
          "message_pid": {
            "type": "string"
          },
          "message_program": {
            "index": "not_analyzed",
            "type": "string"
          },
          "message_timestramp": {
           "include_in_all" : false,
            "type" : "date"
          }
        }
      }
    }
}

Ps:这里就不详细说明字段的含义了,请参考系列文章《ElastiSearch template简介(整理中)》.

将上述模板保存为 messages.json 的文件,然后执行如下命令进行导入:

curl -XPUT http://x.x.x.x:9200/_template/template-messages -d @messages.json
  • 主机为ES地址和端口
  • _template 表示模板方法
  • template-messages 是我们给这个模板定义的名字
  • -d @模板文件,表示将这个模板文件导入到ES

正常将返回如下结果:

{
  "acknowledged" : true
}

三、配置logstash

vim logstash.conf

input {
    kafka {
        # 指定kafka服务IP和端口(老版本是指定zookeeper服务,这里有所区别)
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        # 取出 messages 数据
        topics => "messages"
        # 指定多个logstash为相同的 group_id,协同工作
        group_id => "logstash"
        # kafka 取出来的数据需要格式化为json
        codec => json {
            charset => "UTF-8"
        }
        # 加一个标签字段,用于后面判断
        add_field => { "[@metadata][type]" => "messages" }
    }
}
filter {
    # 根据标签来处理messages数据
    if [@metadata][type] == "messages" {
        grok {
              # 正则匹配
              match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
              # 亮点:由于源消息是没有上报IP的(只有主机名)这里做了点改造,将filebeat传来的name赋值给ip这个字段
              add_field => [ "ip", "%{[beat][name]}" ]
              # 如果成功匹配,则加一个匹配标签
              add_tag => [ "matched" ]
        }
        # 如果标签不存在则丢弃这条消息
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            # 对日志里面的时间戳进行格式转换,适配ES的时间格式
            locale => "en_US"
            timezone => "Asia/Shanghai"
            match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
            target => "@timestamp"
        }
        ruby {
            code => "event['@timestamp'] = event['@timestamp'].getlocal"
        }
        mutate {
            # 移除不需要的字段
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }  
    }
output {
    # 上报系统日志
    if [@metadata][type] == "messages" {   
        # stdout { codec => rubydebug } # 信息打屏,用于DEBUG
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "messages-%{+YYYY.MM.dd}"
            # 禁止logstash管理模板,并指定上文上传的模板
            manage_template => false
            template_name => "template-messages"
        }
    }
}

因为数据格式的不同,grok正则表达式也要做相应的适配,这个过程应该是配置logstash最麻烦的地方。当然,这里我们可以用到 grok 的在线测试工具:http://grokdebug.herokuapp.com/

同样的,我们在配置完logstash之后,先屏蔽 elasticsearch 的上报,然后打开stdout:

stdout { codec => rubydebug }

进行调试,直到打屏数据满足我们的需求后再上报至ES。

四、配置Kibana

有了数据之后,首次打开Kibana发现啥也看不到,此时我们需要先配置下Kibana的index,图解如下:

①、如图打开索引管理:

②、如图点击创建索引:

③、如图输入logstash指定的索引前缀,自动带出字段后选择时间戳字段,点击【Create】即可:

最后,回到Discover界面就能看到期待已久的高清美图了:

本文就介绍这么多,更多Kibana的奇淫巧计请关注《ES私房菜系列文章之教你玩转Kibana(整理中)》。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏鬼谷君

Django项目中使用celery做异步任务

在写项目过程中经常会遇到一些耗时的任务, 比如:发送邮件、发送短信等等~。这些操作如果都同步执行耗时长对用户体验不友好,在这种情况下就可以把任务放在后台异步执行...

861
来自专栏Core Net

SharePoint 2016 修改左上角连接

2756
来自专栏自动化测试实战

Appium+python (3) 异常处理

2885
来自专栏python3

diango使用数据库

之前写的页面,虽然和用户交互得很好,但并没有保存任何数据,页面一旦关闭,或服务器重启,一切都将回到原始状态。

923
来自专栏Seebug漏洞平台

WinDbg 漏洞分析调试(一)

0x00 引子 最近开始要在部门内进行 WinDbg 漏洞分析方面的专题showcase,打算将每次分享的内容整理成文章,希望能写一个系列。另外,鉴于笔者还在学...

3144
来自专栏小灰灰

QuickTask动态脚本支持框架整体介绍篇

一个简单的动态脚本调度框架,支持运行时,实时增加,删除和修改动态脚本,可用于后端的进行接口验证、数据订正,执行定时任务或校验脚本

982
来自专栏linux驱动个人学习

cyclictest 简介

1. cyclictest 简介以及安装 1.1 cyclictest 简介       cyclictest 是什么? 看名字应该就能大致猜出来它是一种 te...

3964
来自专栏向治洪

React Native热更新方案

随着 React Native 的不断发展完善,越来越多的公司选择使用 React Native 替代 iOS/Android 进行部分业务线的开发,也有不少使...

1.7K7
来自专栏芋道源码1024

Java 工程师居家必备的 Intellij IDEA Top10 插件

支持lombok的各种注解,从此不用写getter setter这些 可以把注解还原为原本的java代码 非常方便

1175
来自专栏Android开发小工

你真的知道APP缓存目录的内幕吗

在使用外部存储执行任何工作之前,应始终调用 getExternalStorageState() 以检查介质是否可用。介质可能已装载到计算机,处于缺失、只读或其他...

904

扫码关注云+社区