首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >使用vector从log中提取metrics

使用vector从log中提取metrics

原创
作者头像
保持热爱奔赴山海
修改2025-05-14 08:12:58
修改2025-05-14 08:12:58
4500
举报
文章被收录于专栏:DevOpsDevOps

在日志收集场景下,有时候希望在日志采集的过程中就从中提取某些关键字指标信息,便于及时告警或者metrics统计。

这种情况下,可以使用filebeat采集日志文件发到kakfa中,然后使用flink开2个流消费kafka中的消息,一个流统计关键字指标信息,另一个流负责攒批写日志到ES(或者ClickHouse或StarRocks之类的OLAP数据库中)。

此外,datadog家的vector也可以实现上面的这个功能。

具体看下面的操作。

1、mock日志数据

> vim /tmp/template.json 写入几条测试记录

代码语言:txt
复制
{ "version": 1, "cp": "123456", "reqId": "1239f220", "reqTimeSec": "1573840000", "bytes": "4995", "cliIP": "128.147.28.68", "statusCode": "206", "proto": "HTTPS", "reqHost": "test.hostname.net", "reqMethod": "GET", "reqPath": "/path1/path2/file.ext", "reqPort": "443", "rspContentLen": "5000", "rspContentType": "text/html", "UA": "Mozilla%2F5.0+%28Macintosh%3B+Intel+Mac+OS+X+10_14_3%29", "tlsOverheadTimeMSec": "0", "tlsVersion": "TLSv1", "objSize": "484", "uncompressedSize": "484", "overheadBytes": "232", "totalBytes": "0", "queryStr": "param=value", "accLang": "en-US", "cookie": "cookie-content", "range": "37334-42356", "referer": "https%3A%2F%2Ftest.referrer.net%2Fen-US%2Fdocs%2FWeb%2Ftest", "xForwardedFor": "8.47.28.38", "maxAgeSec": "3600", "reqEndTimeMSec": "3", "errorCode": "ERR_ACCESS_DENIED|fwd_acl", "turnAroundTimeMSec": "11", "transferTimeMSec": "125", "dnsLookupTimeMSec": "50", "customField": "any-custom-value", "cacheStatus": "0", "country": "US", "city": "HERNDON" }
{ "version": 1, "cp": "123456", "reqId": "1239f220", "reqTimeSec": "1573840000", "bytes": "4995", "cliIP": "128.147.28.68", "statusCode": "206", "proto": "HTTPS", "reqHost": "test.hostname.net", "reqMethod": "GET", "reqPath": "/path1/path2/file.ext", "reqPort": "443", "rspContentLen": "5000", "rspContentType": "text/html", "UA": "Mozilla%2F5.0+%28Macintosh%3B+Intel+Mac+OS+X+10_14_3%29", "tlsOverheadTimeMSec": "0", "tlsVersion": "TLSv1", "objSize": "484", "uncompressedSize": "484", "overheadBytes": "232", "totalBytes": "0", "queryStr": "param=value", "accLang": "en-US", "cookie": "cookie-content", "range": "37334-42356", "referer": "https%3A%2F%2Ftest.referrer.net%2Fen-US%2Fdocs%2FWeb%2Ftest", "xForwardedFor": "8.47.28.38", "maxAgeSec": "3600", "reqEndTimeMSec": "3", "errorCode": "ERR_ACCESS_DENIED|fwd_acl", "turnAroundTimeMSec": "11", "transferTimeMSec": "125", "dnsLookupTimeMSec": "50", "customField": "any-custom-value", "cacheStatus": "1", "country": "US", "city": "HERNDON" }

2、编辑vector配置文件

> cat config/main.toml

代码语言:txt
复制
data_dir = "/var/lib/vector"

[api]
enabled = true
address = "127.0.0.1:8686"

cat config/test.toml

代码语言:txt
复制
[sources.app_json_log]
  type = "file"
  include = [ "/tmp/a.json" ] 
  glob_minimum_cooldown_ms = 10000  # 日志文件发现的间隔 10s 
  ignore_older = 86400  # 1 day
  max_line_bytes = 102400   # 单行记录超过102KB则丢弃
  read_from = "beginning"

[transforms.parsing]
  type = "remap" # required
  inputs = ["app_json_log"] # required
  source = '''
  . = parse_json!(string!(.message))
  .timestamp = now()
  .agentid = "123456789"
  del(.not_used_fieldA)
  '''

# cache_misses 的处理逻辑
[transforms.cache_misses]
  type = "filter" # required
  inputs = ["parsing"] # required
  # 更复杂的写法,类似 condition = '.tlsVersion == "TLSv1" && .reqMethod == "GET"'
  # 关于condition的使用,可以参考 https://vector.dev/docs/reference/configuration/transforms/filter/#condition
  condition = '.cacheStatus == "0"'
[transforms.logs2metrics-cache_misses]
  # General
  type = "log_to_metric" # required
  inputs = ["cache_misses"] # required
# Metrics
  [[transforms.logs2metrics-cache_misses.metrics]]
    # General,key名称:website_cache_misses ,格式 namespace+name
    field = "cacheStatus" # required
    name = "cache_misses" # optional, no default
    namespace = "website" # optional, no default
    type = "counter" # required

# cache_hits 的处理逻辑
[transforms.cache_hits]
  type = "filter" # required
  inputs = ["parsing"] # required
  condition = '.cacheStatus == "1"'
[transforms.logs2metrics-cache_hits]
  # General
  type = "log_to_metric" # required
  inputs = ["cache_hits"] # required
# Metrics
  [[transforms.logs2metrics-cache_hits.metrics]]
    # General,key名称:website_cache_hits ,格式 namespace+name
    field = "cacheStatus" # required
    name = "cache_hits" # optional, no default
    namespace = "website" # optional, no default
    type = "counter" # required

# 将统计指标暴露给prometheus的endpoint
[sinks.prometheus_exporter]
  type = "prometheus_exporter" # required
  inputs = ["logs2metrics*"] # required, 将上游logs2metrics中logs2metrics开头的数据都暴露出来
  address = "0.0.0.0:9080" # required

# 数据写入到es中
[sinks.es]
inputs = [ "parsing" ]
type = "elasticsearch"
endpoints = ["http://192.168.31.181:9200"]

  [sinks.es.batch]
  "batch.max_events" = 1000
  "batch.timeout_secs" = 2

  [sinks.es.buffer]
  type = "disk"
  max_size = 536870976
  when_full = "block"

  [sinks.es.bulk]
  index = "vector-new-%Y.%m.%d"

上面为了演示方便简化了流程,直接写到es中。

生产上一般是在端上使用vector将日志写到kafka,然后再使用额外的vector去消费kafka数据写到es中。

3、前台启动

代码语言:txt
复制
./bin/vector -c config/main.toml -c config/test.toml

4、批量生成数据

代码语言:txt
复制
for i in {1..5}; do cat template.json >> a.json ; done

5、查看指标

代码语言:txt
复制
 curl 127.0.0.1:9080/metrics 

结果类似如下:
# HELP website_cache_hits cache_hits
# TYPE website_cache_hits counter
website_cache_hits 10 1747119770005
# HELP website_cache_misses cache_misses
# TYPE website_cache_misses counter
website_cache_misses 10 1747119770005

6、查看ES数据

代码语言:txt
复制
查看es数据
> curl -s 127.0.0.1:9200/_cat/indices | grep vector-new   

> curl -s 127.0.0.1:9200/vector-new-2025.05.13/_search | jq . 

7、使用prometheus采集metrics

步骤忽略

参考 https://xie.infoq.cn/article/7d647a5d5502cb0d7478aa56b

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、mock日志数据
  • 2、编辑vector配置文件
  • 3、前台启动
  • 4、批量生成数据
  • 5、查看指标
  • 6、查看ES数据
  • 7、使用prometheus采集metrics
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档