数据处理规则说明

最近更新时间:2025-08-06 12:10:02

我的收藏

概览

在通过 CKafka 连接器处理数据流入流出任务时,通常需要对数据进行简易的清洗操作,如格式化原始数据、解析特定字段、数据格式转换等等。开发者往往需要自己搭建一套数据清洗的服务(ETL)。
Logstash 是一款免费且开放的服务器端数据处理管道,能够从多个数据源采集数据,转换数据,然后将数据发送到相应的“存储库”中。 logstash 拥有丰富的过滤器插件,这使得 logstash 成为了被广泛使用的一款功能强大的数据转换工具。
然而搭建、配置、维护自己的 logstash 服务会增大开发和运维的难度,为此 CKafka 提供了一套对标 logstash 的数据处理服务,开发者仅需通过控制台交互界面就可以新建自己的数据处理任务。数据处理服务允许用户编辑相应的数据处理规则,支持构建链式处理,同时可以预览数据处理的结果,帮助用户轻松高效的构建一套数据处理服务,满足数据清洗和转换的需求。




功能对标清单

Logstash
连接器数据处理服务
功能
Codec.json
Filter.grok
Filter.mutate.split
Filter.date
Filter.json
Filter.mutate.convert
Filter.mutate.gsub
Filter.mutate.strip
Filter.mutate.join
Filter.mutate.rename
Filter.mutate.update
Filter.mutate.replace
Filter.mutate.add_field
Filter.mutate.remove_field
Filter.mutate.copy
Filter.mutate.merge

TODO
Filter.mutate.uppercase

TODO
Filter.mutate.lowercase

TODO

操作方法介绍

数据解析

通过选择相应的数据解析模式 ,并一键单击即可预览:




日期格式处理

1. 输入带日期格式的原始数据,以下为一个示例。
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"beat": {
"hostname": "test-server",
"ip": "6.6.6.6",
"version": "5.6.9"
},
"input_type": "log",
"message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
"offset": 3030131
}
2. 解析结果如下:

3. CKafka 连接器处理方式:
3.1 通过预设系统当前时间给某字段赋值

3.2 在数据处理模块通过 处理 value 功能来对日期数据进行处理。



处理模式选择转换时间格式,选择好时间格式、时区和日期格式,并确认

4. 单击测试,可以看到转换后的时间格式。





解析内部 JSON 结构

1. 输入带嵌套 JSON 格式的原始数据,以下为一个示例。
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"beat": {
"hostname": "test-server",
"ip": "6.6.6.6",
"version": "5.6.9"
},
"input_type": "log",
"message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
"offset": 3030131
}
2. 解析结果如下:



3. 通过对该字段选择 MAP 操作来对其进行解析,从而把特定字段解析为 JSON 格式:




数据修改

输入原始数据,以下为一个示例。
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"beat": {
"hostname": "test-server",
"ip": "6.6.6.6",
"version": "5.6.9"
},
"input_type": "log",
"message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
"offset": 3030131
}
解析结果如下:



连接器处理方式如下:
方式一:通过选择相应的处理 value 功能一键定义规则。



方式二:通过选择数据类型一键更改相应字段的数据格式。



更改前:



更改后:



方式三:通过 JSONPath 语法实现 join 的拼接功能。如使用 $.concat($.data.Response.SubnetSet[0].VpcId, \\"#\\", $.data.Response.SubnetSet[0].SubnetId, \\"#\\", $.data.Response.SubnetSet[0].CidrBlock) 语法拼接 Vpc 和子网的属性,并且通过 # 字符加以分割。关于 JSONPath 语法的详细介绍请参见 JsonPath 说明




字段修改

在通过 CKafka 连接器进行数据处理的过程中,可以使用多种方式对解析后的数据字段进行编辑修改,以获得您理想的数据。如:
在 KEY 栏可以对字段名称进行修改。
在 VALUE 栏可以选择复制某字段的值。
在下方单击添加可以新增字段。
在右侧单击

按钮可以删除字段。





实际案例演示

案例1:多级字段解析

输入 message:
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"beat": {
"hostname": "test-server",
"ip": "6.6.6.6",
"version": "5.6.9"
},
"input_type": "log",
"message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
"offset": 3030131,
}
目标 message :
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"hostname": "test-server",
"ip": "6.6.6.6",
"userId": 888,
"userName": "testUser"
}
连接器配置方法:
1.1 处理链 1 配置如下:



1.2 处理链 1 结果如下:
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"message": "{\\"userId\\":888,\\"userName\\":\\"testUser\\"}",
"hostname": "test-server",
"ip": "6.6.6.6"
}
1.3 处理链 2 配置如下:



1.4 处理链 2 结果如下:
{
"@timestamp": "2022-02-26T22:25:33.210Z",
"input_type": "log",
"hostname": "test-server",
"ip": "6.6.6.6",
"userId": 888,
"userName": "testUser"
}

案例2:非 JSON 数据解析

输入 message :
region=Shanghai$area=a1$server=6.6.6.6$user=testUser$timeStamp=2022-02-26T22:25:33.210Z
目标 message:
{
"region": "Shanghai",
"area": "a1",
"server": "6.6.6.6",
"user": "testUser",
"timeStamp": "2022-02-27 06:25:33",
"processTimeStamp": "2022-06-27 11:14:49"
}
连接器配置方法:
1.1 使用分隔符 $ 对原始 message 进行解析



1.2 初步解析结果:
{
"0": "region=Shanghai",
"1": "area=a1",
"2": "server=6.6.6.6",
"3": "user=testUser",
"4": "timeStamp=2022-02-26T22:25:33.210Z"
}
1.3 使用分隔符 = 对结果二次解析:



1.4 二次解析结果:
{
"0": "region=Shanghai",
"1": "area=a1",
"2": "server=6.6.6.6",
"3": "user=testUser",
"4": "timeStamp=2022-02-26T22:25:33.210Z",
"0.region": "Shanghai",
"1.area": "a1",
"2.server": "6.6.6.6",
"3.user": "testUser",
"4.timeStamp": "2022-02-26T22:25:33.210Z"
}
1.5 对字段进行编辑、删减,调整时间戳格式,并新增当前系统时间字段:





最终结果:
{
"region": "Shanghai",
"area": "a1",
"server": "6.6.6.6",
"user": "testUser",
"timeStamp": "2022-02-27 06:25:33",
"processTimeStamp": "2022-06-27 11:14:49"
}