专栏首页张戈的专栏【ES私房菜】收集 Linux 系统日志
原创

【ES私房菜】收集 Linux 系统日志

ES环境已经就绪,我们需要先调通一类简单日志作为数据上报全链路的试运行。恰好Linux系统日志就是这样一个角色,随处可见,风险可控。

这里,我们选择了2种Linux系统日志:/var/log/messages 和 /var/log/cron。

一、部署Filebeat

按照《【ES私房菜】Filebeat安装部署及配置详解》在需要采集系统日志的系统上部署filebeat,然后编写如下配置:

vim filebeat.yml

############################# input #########################################
filebeat.prospectors:
- input_type: log
  paths: /var/log/messages
  paths: /var/log/cron
  document_type: "messages"

spool_size: 1024
idle_timeout: "5s"
name: 172.16.x.xxx

############################# Kafka #########################################
output.kafka:
  # initial brokers for reading cluster metadata
  hosts: ["x.x.x.1:9092","x.x.x.2:9092","x.x.x.3:9092"]
  # message topic selection + partitioning
  topic: '%{[type]}'
  flush_interval: 1s
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

############################# Logging #########################################
logging.level: info
logging.to_files: true
logging.to_syslog: false
logging.files:
  path: /data/filebeat/logs
  name: filebeat.log
  keepfiles: 7

############################# console #########################################
#For Debug
output.console:
  # Boolean flag to enable or disable the output module.
  enabled: true
  # Pretty print json event
  pretty: true

这里我们先把 console 输出打开,然后启动filebeat观察输出:

{
  "@timestamp": "2017-09-15T01:07:02.931Z",
  "beat": {
    "hostname": "TENCENT64.site",
    "name": "172.16.x.xxx",
    "version": "5.5.1"
  },
  "input_type": "log",
  "message": "Sep 15 09:07:01 TENCENT64 CROND[815]: (root) CMD (/usr/local/sa/agent/secu-tcs-agent-mon-safe.sh  \u003e /dev/null 2\u003e\u00261)",
  "offset": 12991588,
  "source": "/var/log/cron",
  "type": "messages"
}

可以看到输出了如上的json格式数据,这些数据会原样上报kafka或logstash。

二、配置template

在数据上报ES之前,我们还需要先配置下相应的ES template。

当然,如果不想配置也可以,在后文的 logstash 的 output 模块中去掉【禁止管理模板的配置】即可,也就是让 logstash 自适应导入 template。

但是这样有个弊端,因为 logstash 是自适应匹配模板,可能有一些字段类型就不是那么准确,导致我们后面在Kibana里面就无法对一些字段进行聚合分析了。

根据messages字段内容,编写如下template:

{
    "template": "messages-*",
    "mappings": {
      "log": {
        "properties": {
          "@timestamp": {
            "include_in_all" : false,
            "type" : "date"
          },
          "ip": {
            "index": "not_analyzed",
            "type": "ip"
          },
          "hostname": {
            "index": "not_analyzed",
            "type": "string"
          },
          "message": {
            "type": "string"
          },
          "message_content": {
            "type": "string"
          },
          "message_pid": {
            "type": "string"
          },
          "message_program": {
            "index": "not_analyzed",
            "type": "string"
          },
          "message_timestramp": {
           "include_in_all" : false,
            "type" : "date"
          }
        }
      }
    }
}

Ps:这里就不详细说明字段的含义了,请参考系列文章《ElastiSearch template简介(整理中)》.

将上述模板保存为 messages.json 的文件,然后执行如下命令进行导入:

curl -XPUT http://x.x.x.x:9200/_template/template-messages -d @messages.json
  • 主机为ES地址和端口
  • _template 表示模板方法
  • template-messages 是我们给这个模板定义的名字
  • -d @模板文件,表示将这个模板文件导入到ES

正常将返回如下结果:

{
  "acknowledged" : true
}

三、配置logstash

vim logstash.conf

input {
    kafka {
        # 指定kafka服务IP和端口(老版本是指定zookeeper服务,这里有所区别)
        bootstrap_servers => "x.x.x.1:9092,x.x.x.2:9092,x.x.x.3:9092"
        # 取出 messages 数据
        topics => "messages"
        # 指定多个logstash为相同的 group_id,协同工作
        group_id => "logstash"
        # kafka 取出来的数据需要格式化为json
        codec => json {
            charset => "UTF-8"
        }
        # 加一个标签字段,用于后面判断
        add_field => { "[@metadata][type]" => "messages" }
    }
}
filter {
    # 根据标签来处理messages数据
    if [@metadata][type] == "messages" {
        grok {
              # 正则匹配
              match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:mess
age_content}" }
              # 亮点:由于源消息是没有上报IP的(只有主机名)这里做了点改造,将filebeat传来的name赋值给ip这个字段
              add_field => [ "ip", "%{[beat][name]}" ]
              # 如果成功匹配,则加一个匹配标签
              add_tag => [ "matched" ]
        }
        # 如果标签不存在则丢弃这条消息
        if ("matched" not in  [tags]) {
           drop {}
        }
        date {
            # 对日志里面的时间戳进行格式转换,适配ES的时间格式
            locale => "en_US"
            timezone => "Asia/Shanghai"
            match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
            target => "@timestamp"
        }
        ruby {
            code => "event['@timestamp'] = event['@timestamp'].getlocal"
        }
        mutate {
            # 移除不需要的字段
            remove_field => "[beat][hostname]"      
            remove_field => "[beat][name]"      
            remove_field => "@version"      
            remove_field => "[beat][version]"
            remove_field => "input_type"
            remove_field => "offset"
            remove_field => "tags"
            remove_field => "type"
            remove_field => "host"
        }  
    }
output {
    # 上报系统日志
    if [@metadata][type] == "messages" {   
        # stdout { codec => rubydebug } # 信息打屏,用于DEBUG
        elasticsearch {
            hosts => ["x.x.x.x:9200"]
            index => "messages-%{+YYYY.MM.dd}"
            # 禁止logstash管理模板,并指定上文上传的模板
            manage_template => false
            template_name => "template-messages"
        }
    }
}

因为数据格式的不同,grok正则表达式也要做相应的适配,这个过程应该是配置logstash最麻烦的地方。当然,这里我们可以用到 grok 的在线测试工具:http://grokdebug.herokuapp.com/

同样的,我们在配置完logstash之后,先屏蔽 elasticsearch 的上报,然后打开stdout:

stdout { codec => rubydebug }

进行调试,直到打屏数据满足我们的需求后再上报至ES。

四、配置Kibana

有了数据之后,首次打开Kibana发现啥也看不到,此时我们需要先配置下Kibana的index,图解如下:

①、如图打开索引管理:

②、如图点击创建索引:

③、如图输入logstash指定的索引前缀,自动带出字段后选择时间戳字段,点击【Create】即可:

最后,回到Discover界面就能看到期待已久的高清美图了:

本文就介绍这么多,更多Kibana的奇淫巧计请关注《ES私房菜系列文章之教你玩转Kibana(整理中)》。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 结合VBS,实现批处理自动以管理员身份执行

    这是我在百度回答知友时写的,用于自动以管理员身份执行命令或程序→百度原址 批处理代码,功能:自动以管理员身份执行 test.exe: start admin.v...

    张戈
  • 【 ES 私房菜】ElasticSearch 详细部署教程

    Elasticsearch 是一个分布式的 RESTful 风格的搜索和数据分析引擎,能够解决不断涌现出的各种用例。本文详细介绍了elasticserch的部署...

    张戈
  • WordPress高亮插件:Crayon Syntax Highlighter加载优化

    Crayon Syntax Highlighter 是我这种代码控的必装插件。但是,这款插件也有一些小缺憾,比如体积大、拖慢速度、容易产生冲突等。很多博主就是因...

    张戈
  • 声明性数据基础设施为数据驱动企业提供动力

    大数据、人工智能/ML和现代分析技术已经渗透到商业世界,成为企业战略的关键元素,以更好地服务客户、更快地创新和保持领先的竞争。数据是所有这些的核心。在本博客中,...

    CNCF
  • Flutter 横向滚动标签

    要实现上面的效果,每个种类的标签横向滚动,实现的方式,最外层的大分类标签一个ListView,每个分类的标签也是ListView 设置横向滚动结合Wrap组件就...

    赵哥窟
  • 人工智能训练使用视网膜扫描发现心脏病风险

    谷歌及其子公司Verily的科学家们发现了一种利用机器学习来评估一个人患心脏病风险的新方法。通过分析病人眼睛后部的扫描结果,该公司的软件能够准确地推断出数据,包...

    人工智能快报
  • kubernetesv1.17集群生态搭建笔记

    上一次接触到kubernetes集群的时候已经是一年以前了,那个时候官方的版本还只是v1.10,而现在过去一年的时间了,官方版本已经快速的迭代到了v1.17了,...

    云爬虫技术研究笔记
  • 深度 | 我国医疗大数据技术的发展趋势

    目前我国的医疗行业现状是:优质医疗资源集中在大城市,地方以及偏远地区医疗条件较差,医疗资源的配置不合理,导致了大量的长尾需求,催生了广阔的互联网医疗市场。在此...

    小莹莹
  • 58到家MQ如何快速实现流量削峰填谷

    问:为什么会有本文? 答:上一篇文章《到底什么时候该使用MQ?》引起了广泛的讨论,有朋友回复说,MQ的还有一个典型应用场景是缓冲流量,削峰填谷,本文将简单介绍下...

    架构师之路
  • 目标检测--Beyond Skip Connections: Top-Down Modulation for Object Detection

    Beyond Skip Connections: Top-Down Modulation for Object Detection CVPR2017 und...

    用户1148525

扫码关注云+社区

领取腾讯云代金券