日志消费 CLS

最近更新时间:2025-09-04 14:21:32

我的收藏

介绍

CLS(Cloud Log Service)日志服务,可以作为 Oceanus 的数据源(Source)和数据目的(Sink),用户可以通过 Oceanus 导入和导出 CLS 日志,参与流式计算。

版本说明

Flink 版本
说明
1.11
支持
1.13
支持
1.14
支持
1.16
支持
1.18
支持

使用范围

CLS 支持用作数据源表(Source)和数据目的(Sink)。

CLS日志主题

建立 CLS Source 或者 Sink 表前,需要创建 CLS 日志主题,具体步骤如下:
注意:
建立 CLS Sink 端只需要进行1,2步骤操作。
1. 进入 CLS 日志主题 页面,单击创建日志主题

2. 日志主题创建成功后,在列表页单击新建日志主题的日志主题名称/ID 进入到详情页面。


3. 建立 CLS Source 端需要在详情页,切换到 Kafka协议消费页签,打开 Kafka 协议消费的功能。



4. 开启后的状态如下:



5. 若有疑问,请参见 Kafka 协议消费

Oceanus 引入 CLS 日志源端或者目的端

在 Oceanus 控制台新建作业。



编写 SQL,注意表字段需要换成自己 CLS 日志主题对应的字段。

Source 端创建语句

CREATE TABLE `cls_source` (
-- 需要自己替换成CLS日志中相应的字段
`@metadata` STRING,
`@timestamp` TIMESTAMP,
`agent` STRING,
`ecs` STRING,
`host` STRING,
`input` STRING,
`log` STRING,
`message` STRING,
`partition_id` BIGINT METADATA
FROM
'partition' VIRTUAL,
-- kafka分区
`ts` TIMESTAMP(3) METADATA
FROM
'timestamp'
) WITH (
'connector' = 'kafka',
-- cls kafka协议消费控制台给出的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'topic' = '您的消费主题',
-- 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',
-- 请替换为您的消费组名称
'properties.group.id' = '您的消费组名称',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
-- 用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
-- 密码是用户的SecretId#SecretKey组合的字符串,比如AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);

Sink 端创建语句

CREATE TABLE `cls_sink` (
-- CLS对应日志主题的字段
`logNo` STRING,
`city` STRING,
`__CONTENT__` STRING,
`__PKG_LOGID__` STRING
) WITH (
'connector' = 'kafka',
-- CLS日志主题基本信息中的日志主题ID
'topic' = 'CLS日志主题ID',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true' ,
-- CLS日志主题基本信息中通过Kafka协议上报数据的域名
'properties.bootstrap.servers' = '${region}.cls.tencentyun.com:9095',
-- 用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
-- 密码是用户的SecretId#SecretKey组合的字符串,比如AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);
说明:
flink 版本 sasl 认证配置 对应包:
1.16版本以下:org.apache.kafka.common.security.plain.PlainLoginModule
1.16版本及以上:org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule

注意事项

前提条件及相关限制,请参见 Kafka 协议消费