日志结构化-Grok

最近更新时间:2024-10-18 11:45:21

我的收藏

场景描述

小王通过 Kafka 协议上传日志 将 Beats 采集的日志上报到 CLS,思路如下:
1. 使用 grok 函数对日志进行结构化。
2. 使用日志中 time 字段替换 CLS 的日志时间(__TIMESTAMP__)。




原始日志


{
"__FILENAME__": "",
"__SOURCE__": "192.168.100.123",
"message": "2024-10-11 15:32:10.003 DEBUG [gateway,746db87efd1bbcf5434cb9835c59e522,47c3036810e0c33b] [scheduled-Thread-1] c.i.g.c.f.d.a.task.AppleHealthCheckTask"
}

加工结果

{
"__FILENAME__":"",
"__SOURCE__":"192.168.100.123",
"__TIMESTAMP__":"1728631930003",
"level":"DEBUG",
"service":"gateway",
"spanid":"47c3036810e0c33b",
"time":"2024-10-11 15:32:10.003",
"traceid":"746db87efd1bbcf5434cb9835c59e522"
}

加工语句

// 使用grok函数从日志中提取时间time,日志级别level,service,traceid和spanid
ext_grok("message",grok="%{TIMESTAMP_ISO8601:time} %{DATA:level} \\[%{DATA:service},%{DATA:traceid},%{DATA:spanid}\\]")
//删除message字段
fields_drop("message")
// custom_cls_log_time函数,使用新字段time替换CLS的日志时间(__TIMESTAMP__)
custom_cls_log_time(dt_to_timestamp(v("time"), zone="UTC+8"))