在日志收集场景下,有时候希望在日志采集的过程中就从中提取某些关键字指标信息,便于及时告警或者metrics统计。
这种情况下,可以使用filebeat采集日志文件发到kakfa中,然后使用flink开2个流消费kafka中的消息,一个流统计关键字指标信息,另一个流负责攒批写日志到ES(或者ClickHouse或StarRocks之类的OLAP数据库中)。
此外,datadog家的vector也可以实现上面的这个功能。
具体看下面的操作。
> vim /tmp/template.json 写入几条测试记录
{ "version": 1, "cp": "123456", "reqId": "1239f220", "reqTimeSec": "1573840000", "bytes": "4995", "cliIP": "128.147.28.68", "statusCode": "206", "proto": "HTTPS", "reqHost": "test.hostname.net", "reqMethod": "GET", "reqPath": "/path1/path2/file.ext", "reqPort": "443", "rspContentLen": "5000", "rspContentType": "text/html", "UA": "Mozilla%2F5.0+%28Macintosh%3B+Intel+Mac+OS+X+10_14_3%29", "tlsOverheadTimeMSec": "0", "tlsVersion": "TLSv1", "objSize": "484", "uncompressedSize": "484", "overheadBytes": "232", "totalBytes": "0", "queryStr": "param=value", "accLang": "en-US", "cookie": "cookie-content", "range": "37334-42356", "referer": "https%3A%2F%2Ftest.referrer.net%2Fen-US%2Fdocs%2FWeb%2Ftest", "xForwardedFor": "8.47.28.38", "maxAgeSec": "3600", "reqEndTimeMSec": "3", "errorCode": "ERR_ACCESS_DENIED|fwd_acl", "turnAroundTimeMSec": "11", "transferTimeMSec": "125", "dnsLookupTimeMSec": "50", "customField": "any-custom-value", "cacheStatus": "0", "country": "US", "city": "HERNDON" }
{ "version": 1, "cp": "123456", "reqId": "1239f220", "reqTimeSec": "1573840000", "bytes": "4995", "cliIP": "128.147.28.68", "statusCode": "206", "proto": "HTTPS", "reqHost": "test.hostname.net", "reqMethod": "GET", "reqPath": "/path1/path2/file.ext", "reqPort": "443", "rspContentLen": "5000", "rspContentType": "text/html", "UA": "Mozilla%2F5.0+%28Macintosh%3B+Intel+Mac+OS+X+10_14_3%29", "tlsOverheadTimeMSec": "0", "tlsVersion": "TLSv1", "objSize": "484", "uncompressedSize": "484", "overheadBytes": "232", "totalBytes": "0", "queryStr": "param=value", "accLang": "en-US", "cookie": "cookie-content", "range": "37334-42356", "referer": "https%3A%2F%2Ftest.referrer.net%2Fen-US%2Fdocs%2FWeb%2Ftest", "xForwardedFor": "8.47.28.38", "maxAgeSec": "3600", "reqEndTimeMSec": "3", "errorCode": "ERR_ACCESS_DENIED|fwd_acl", "turnAroundTimeMSec": "11", "transferTimeMSec": "125", "dnsLookupTimeMSec": "50", "customField": "any-custom-value", "cacheStatus": "1", "country": "US", "city": "HERNDON" }> cat config/main.toml
data_dir = "/var/lib/vector"
[api]
enabled = true
address = "127.0.0.1:8686"cat config/test.toml
[sources.app_json_log]
type = "file"
include = [ "/tmp/a.json" ]
glob_minimum_cooldown_ms = 10000 # 日志文件发现的间隔 10s
ignore_older = 86400 # 1 day
max_line_bytes = 102400 # 单行记录超过102KB则丢弃
read_from = "beginning"
[transforms.parsing]
type = "remap" # required
inputs = ["app_json_log"] # required
source = '''
. = parse_json!(string!(.message))
.timestamp = now()
.agentid = "123456789"
del(.not_used_fieldA)
'''
# cache_misses 的处理逻辑
[transforms.cache_misses]
type = "filter" # required
inputs = ["parsing"] # required
# 更复杂的写法,类似 condition = '.tlsVersion == "TLSv1" && .reqMethod == "GET"'
# 关于condition的使用,可以参考 https://vector.dev/docs/reference/configuration/transforms/filter/#condition
condition = '.cacheStatus == "0"'
[transforms.logs2metrics-cache_misses]
# General
type = "log_to_metric" # required
inputs = ["cache_misses"] # required
# Metrics
[[transforms.logs2metrics-cache_misses.metrics]]
# General,key名称:website_cache_misses ,格式 namespace+name
field = "cacheStatus" # required
name = "cache_misses" # optional, no default
namespace = "website" # optional, no default
type = "counter" # required
# cache_hits 的处理逻辑
[transforms.cache_hits]
type = "filter" # required
inputs = ["parsing"] # required
condition = '.cacheStatus == "1"'
[transforms.logs2metrics-cache_hits]
# General
type = "log_to_metric" # required
inputs = ["cache_hits"] # required
# Metrics
[[transforms.logs2metrics-cache_hits.metrics]]
# General,key名称:website_cache_hits ,格式 namespace+name
field = "cacheStatus" # required
name = "cache_hits" # optional, no default
namespace = "website" # optional, no default
type = "counter" # required
# 将统计指标暴露给prometheus的endpoint
[sinks.prometheus_exporter]
type = "prometheus_exporter" # required
inputs = ["logs2metrics*"] # required, 将上游logs2metrics中logs2metrics开头的数据都暴露出来
address = "0.0.0.0:9080" # required
# 数据写入到es中
[sinks.es]
inputs = [ "parsing" ]
type = "elasticsearch"
endpoints = ["http://192.168.31.181:9200"]
[sinks.es.batch]
"batch.max_events" = 1000
"batch.timeout_secs" = 2
[sinks.es.buffer]
type = "disk"
max_size = 536870976
when_full = "block"
[sinks.es.bulk]
index = "vector-new-%Y.%m.%d"上面为了演示方便简化了流程,直接写到es中。
生产上一般是在端上使用vector将日志写到kafka,然后再使用额外的vector去消费kafka数据写到es中。
./bin/vector -c config/main.toml -c config/test.tomlfor i in {1..5}; do cat template.json >> a.json ; done curl 127.0.0.1:9080/metrics
结果类似如下:
# HELP website_cache_hits cache_hits
# TYPE website_cache_hits counter
website_cache_hits 10 1747119770005
# HELP website_cache_misses cache_misses
# TYPE website_cache_misses counter
website_cache_misses 10 1747119770005查看es数据
> curl -s 127.0.0.1:9200/_cat/indices | grep vector-new
> curl -s 127.0.0.1:9200/vector-new-2025.05.13/_search | jq . 步骤忽略
参考 https://xie.infoq.cn/article/7d647a5d5502cb0d7478aa56b
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。