有奖捉虫:云通信与企业服务文档专题,速来> HOT

操作场景

在使用 CKafka 连接器在进行数据流入流出服务时,可能会遇到如下情况:
需要对消息中的特定字段进行解析,得到相关信息。
需要多次迭代处理消息中某一字段。
需要处理为未经过结构化的原始消息后,才能继续使用。
需要处理多层嵌套格式的消息。
目前 CKafka 团队的技术专家经过长期的经验总结,全新推出了数据处理 V2.0 版本。 该版本在完美兼容原有数据处理 V1.0 的基础上,还增加了以下特性:
插件丰富:数据处理支持丰富的处理预制插件,利于快速生成所需的消费格式。
链式处理:数据处理支持消息链式处理,即上一次的处理结果能够作为本次处理的输入参数,利于处理复杂结构。
可视预览:数据处理支持实时 JSON 结构化预览,利于迅速定位排查问题。
类型转换:数据处理支持不同数据类型间的转换,利于校验数据格式。

运行原理

整个数据处理过程如下图所示,各个组件结构说明如下:


数据处理组件集群中,多个 worker 共同组成一个消费者组,批量读取源 topic 中的消息,并且对于每条消息顺序执行处理操作。
对于每条消息,根据控制台设置的处理链顺序,依次展开消息字段的嵌套结构,并进行替换/截取/数据转换/时间格式化等操作。
下一条处理链读取上一条处理链结果,并且进行本轮的链式处理,并将处理结果投递到下一轮。
执行完最后一轮处理链后的结果,投递到设置的目标 topic 中,至此完成该条消息的数据处理操作。

前提条件

需开通 CKafka 服务。
需消息格式为 JSON 格式或字符串格式,目前暂不支持其他编码协议。

实际应用

处理字符串类型格式日志

Nginx 格式日志通常如下所示,这种 Nginx 默认格式也被称作 combined,可以从中解析出请求者信息,报文信息,请求状态等信息,以便于对数据进行进一步分析。
66.249.65.159 - - [06/Nov/2014:19:10:38 +0600] "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1" 404 177 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
由于 Nginx 的 combined 格式采用空格和 - 分隔符来分隔数据,因此按照下文顺序设计解析流程:
1. 首先使用 "分隔符 进行初步数据解析,此时数据处理将会自动将日志转换成 JSON 结构
{
"0": "66.249.65.159 - - [06/Nov/2014:19:10:38 +0600] ",
"1": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",
"2": " 404 177 ",
"5": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
}
2. 从上文 JSON 结构中可知,键名为 02 的字段由于 - 和空格的影响,还存在连接的耦合数据。因此再分别对这两个键进行 - 和空格进行 分隔符 拆分。 拆分后的 JSON 结果如下所示:
{
"1": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",
"5": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
"0.0": "66.249.65.159 ",
"0.2": " [06/Nov/2014:19:10:38 +0600] ",
"2.1": "404",
"2.2": "177"
}
3. 由于时间格式前后还带有 [] 方括号,再使用一次 分隔符 进行截取,截取后的 JSON 如下所示:
{
"1": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",
"5": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
"0.0": "66.249.65.159 ",
"0.2": "06/Nov/2014:19:10:38 +0600",
"2.1": "404",
"2.2": "177"
}
4. 由于所有字段都已经被合适拆分,最后根据对应字段属性,给相应映射字段的 key 设置名称即可。修改后最终结果如下所示:
{
"request": "GET /news/53f8d72920ba2744fe873ebc.html HTTP/1.1",
"http_user_agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)",
"remote_addr": "66.249.65.159 ",
"dateTime": "06/Nov/2014:19:10:38 +0600",
"status": "404",
"body_bytes_sent ": "177"
}

处理嵌套类型格式日志

TKE 采集格式通常如下所示,TKE 采集器会将 metadata 数据放入 JSON 结构中的 kubernetes 字段中,采集到的日志放入 log 字段中。大致结构如下所示:
{
"@timestamp": 1648803500.63659,
"@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",
"log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO com.qcloud.jgw.gateway.server.topic.TopicService",
"kubernetes": {
"pod_name": "tke-es-687995d557-n29jr",
"namespace_name": "default",
"pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
"labels": {
"k8s-app": "tke-es",
"pod-template-hash": "687995d557",
"qcloud-app": "tke-es"
},
"annotations": {
"qcloud-redeploy-timestamp": "1648016531476",
"tke.cloud.tencent.com/networks-status": "[{\\n \\"name\\": \\"tke-bridge\\",\\n \\"interface\\": \\"eth0\\",\\n \\"ips\\": [\\n \\"172.16.0.31\\"\\n ],\\n \\"mac\\": \\"ae:61:12:4a:c2:ba\\",\\n \\"default\\": true,\\n \\"dns\\": {}\\n}]"
},
"host": "10.0.96.47",
"container_name": "nginx",
"docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba",
"container_hash": "nginx@sha256:e1211ac17b29b585ed1aee166a17fad63d344bc973bc63849d74c6452d549b3e",
"container_image": "nginx"
}
}
当日志采集后投递进 ElasticSearch 时,由于支持嵌套 JSON 结构,因此不需要对数据做出太大改动。但当需要投递到数据库时,此时就需要将数据转换成能够识别的单层 JSON 格式,因此按照下文顺序设计解析流程:
1. 使用 JSONPATH 语句处理第一层嵌套的 JSON 结构 $.kubernetes,解析模式选择 JSON。首先将嵌套 JSON 结构转换为单层 JSON 结构。测试后结果如下所示:
{
"@timestamp": 1.64880350063659E9,
"@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",
"log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO com.qcloud.jgw.gateway.server.topic.TopicService",
"$.kubernetes.pod_name": "tke-es-687995d557-n29jr",
"$.kubernetes.namespace_name": "default",
"$.kubernetes.pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
"$.kubernetes.labels": {
"k8s-app": "tke-es",
"pod-template-hash": "687995d557",
"qcloud-app": "tke-es"
},
"$.kubernetes.annotations": {
"qcloud-redeploy-timestamp": "1648016531476",
"tke.cloud.tencent.com/networks-status": "[{\\n \\"name\\": \\"tke-bridge\\",\\n \\"interface\\": \\"eth0\\",\\n \\"ips\\": [\\n \\"172.16.0.31\\"\\n ],\\n \\"mac\\": \\"ae:61:12:4a:c2:ba\\",\\n \\"default\\": true,\\n \\"dns\\": {}\\n}]"
},
"$.kubernetes.host": "10.0.96.47",
"$.kubernetes.container_name": "nginx",
"$.kubernetes.docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba",
"$.kubernetes.container_hash": "nginx@sha256:e1211ac17b29b585ed1aee166a17fad63d344bc973bc63849d74c6452d549b3e",
"$.kubernetes.container_image": "nginx"
}
2. 依次处理第二层嵌套结构 $.kubernetes.annotations$.kubernetes.labels。在处理链中使用 Map 方式选中这两个名称,即可将嵌套格式转换成单层 JSON 格式。处理后如下所示:
{
"@timestamp": 1648803500.63659,
"@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",
"log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO com.qcloud.jgw.gateway.server.topic.TopicService",
"$.kubernetes.pod_name": "tke-es-687995d557-n29jr",
"$.kubernetes.namespace_name": "default",
"$.kubernetes.pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
"$.kubernetes.host": "10.0.96.47",
"$.kubernetes.container_name": "nginx",
"$.kubernetes.docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba",
"$.kubernetes.container_hash": "nginx@sha256:e1211ac17b29b585ed1aee166a17fad63d344bc973bc63849d74c6452d549b3e",
"$.kubernetes.container_image": "nginx",
"$.kubernetes.labels.k8s-app": "tke-es",
"$.kubernetes.labels.pod-template-hash": "687995d557",
"$.kubernetes.labels.qcloud-app": "tke-es",
"$.kubernetes.annotations.qcloud-redeploy-timestamp": "1648016531476",
"$.kubernetes.annotations.tke.cloud.tencent.com/networks-status": "[{\\n \\"name\\": \\"tke-bridge\\",\\n \\"interface\\": \\"eth0\\",\\n \\"ips\\": [\\n \\"172.16.0.31\\"\\n ],\\n \\"mac\\": \\"ae:61:12:4a:c2:ba\\",\\n \\"default\\": true,\\n \\"dns\\": {}\\n}]"
}
3. 最后修改相应映射字段的 key 为所需名称,并且删去不需要的字段。此时单击 添加处理链 后,打开 处理上层所有结果 按钮,整理优化后可以参见如下所示:
{
"@timestamp": 1.64880350063659E9,
"@filepath": "/var/log/tke-log-agent/test7/c816991f-adfe-4617-8cf3-9997aea90ded/c_tke-es-687995d557-n29jr_default_nginx-add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba/jgw_INFO_2022-02-10_15_4.log",
"log": "15:00:00.000[4349811564226374227] [http-nio-8081-exec-64] INFO com.qcloud.jgw.gateway.server.topic.TopicService",
"pod_name": "tke-es-687995d557-n29jr",
"namespace_name": "default",
"pod_id": "c816991f-adfe-4617-8cf3-9997aea90ded",
"host": "10.0.96.47",
"container_name": "nginx",
"docker_id": "add90ccf49626ef42d5615a636aae74d6380996043cf6f6560d8131f21a4d8ba"
}
说明
在使用 JSONPath 处理参数时,当 key 中本身带有英文句号 . 时,需要在路径中方括号和单引号进行隔离。
例如 {"key1.key2":"value1"} 中,要想取得对应字段,则需要使用 $.['key1.key2'] 进行获取相应键值。

处理字符串序列化 JSON 格式日志

有时 JSON 格式在传输过程中,由于格式或性能等需要,需要转义成字符串格式的形式,此处把这种格式叫做 Raw JSON。需要在数据处理中重新将字符串反序列化成 JSON 格式。 此处以 MongoDB 中的 Raw JSON 格式为例,大致结构如下所示:
{
"key": " {\\n \\"categories\\": [\\"dev\\"],\\n \\"created_at\\": \\"2020-01-05 13:42:19.324003\\",\\n \\"icon_url\\": \\"https://assets.chucknorris.host/img/avatar/chuck-norris.png\\",\\n \\"id\\": \\"elgv2wkvt8ioag6xywykbq\\",\\n \\"updated_at\\": \\"2020-01-05 13:42:19.324003\\",\\n \\"url\\": \\"https://api.chucknorris.io/jokes/elgv2wkvt8ioag6xywykbq\\",\\n \\"value\\": \\"Chuck Norris's keyboard doesn't have a Ctrl key because nothing controls Chuck Norris.\\"\\n }\\n"
}
如果在数据处理中就将 Raw JSON 转换成普通的 JSON 格式,就能按照字段格式直接投递到目标下游中。大致处理思路如下所示:
1. 首先设置 解析模式 为 JSON。这样能够将消息先预读取成内部的 MAP 格式。解析后如下所示:
{
"key": " {\\n \\"categories\\": [\\"dev\\"],\\n \\"created_at\\": \\"2020-01-05 13:42:19.324003\\",\\n \\"icon_url\\": \\"https://assets.chucknorris.host/img/avatar/chuck-norris.png\\",\\n \\"id\\": \\"elgv2wkvt8ioag6xywykbq\\",\\n \\"updated_at\\": \\"2020-01-05 13:42:19.324003\\",\\n \\"url\\": \\"https://api.chucknorris.io/jokes/elgv2wkvt8ioag6xywykbq\\",\\n \\"value\\": \\"Chuck Norris's keyboard doesn't have a Ctrl key because nothing controls Chuck Norris.\\"\\n }\\n"
}
2. 单击 添加处理链,使用 MAP 方式选中 key,解析方式选择 JSON。这样数据处理就能将 RAW JSON 自动转换成 JSON 形式。解析后如下所示:
{
"key.categories": [
"dev"
],
"key.created_at": "2020-01-05 13:42:19.324003",
"key.icon_url": "https://assets.chucknorris.host/img/avatar/chuck-norris.png",
"key.id": "elgv2wkvt8ioag6xywykbq",
"key.updated_at": "2020-01-05 13:42:19.324003",
"key.url": "https://api.chucknorris.io/jokes/elgv2wkvt8ioag6xywykbq",
"key.value": "Chuck Norris's keyboard doesn't have a Ctrl key because nothing controls Chuck Norris."
}
3. 最后单击 添加处理链 后,打开 处理上层所有结果 按钮,修改相应映射字段的 key 为所需名称,并且删去不需要的字段。整理优化后可以参见如下所示:
{
"categories": [
"dev"
],
"created_at": "2020-01-05 13:42:19.324003",
"icon_url": "https://assets.chucknorris.host/img/avatar/chuck-norris.png",
"id": "elgv2wkvt8ioag6xywykbq",
"updated_at": "2020-01-05 13:42:19.324003",
"url": "https://api.chucknorris.io/jokes/elgv2wkvt8ioag6xywykbq",
"value": "Chuck Norris's keyboard doesn't have a Ctrl key because nothing controls Chuck Norris."
}
说明
目前 RAW JSON 只支持解析 MAP 类型的数据。当最外层为 List 类型时,例如 "[\\"test1\\",\\"test2\\"]",或者 "[{\\"key\\":\\"value\\"}]",由于无法解析合适的键值,因此将会提示解析失败。