介绍
CLS(Cloud Log Service)日志服务,可以作为 Oceanus 的数据源(Source)和数据目的(Sink),用户可以通过 Oceanus 导入和导出 CLS 日志,参与流式计算。
版本说明
Flink 版本 | 说明 |
1.11 | 支持 |
1.13 | 支持 |
1.14 | 支持 |
1.16 | 支持 |
1.18 | 支持 |
使用范围
CLS 支持用作数据源表(Source)和数据目的(Sink)。
CLS日志主题
建立 CLS Source 或者 Sink 表前,需要创建 CLS 日志主题,具体步骤如下:
注意:
建立 CLS Sink 端只需要进行1,2步骤操作。
1. 进入 CLS 日志主题 页面,单击创建日志主题。

2. 日志主题创建成功后,在列表页单击新建日志主题的日志主题名称/ID 进入到详情页面。


3. 建立 CLS Source 端需要在详情页,切换到 Kafka协议消费页签,打开 Kafka 协议消费的功能。


4. 开启后的状态如下:


5. 若有疑问,请参见 Kafka 协议消费。
Oceanus 引入 CLS 日志源端或者目的端
在 Oceanus 控制台新建作业。


编写 SQL,注意表字段需要换成自己 CLS 日志主题对应的字段。
Source 端创建语句
CREATE TABLE `cls_source` (-- 需要自己替换成CLS日志中相应的字段`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,`partition_id` BIGINT METADATAFROM'partition' VIRTUAL,-- kafka分区`ts` TIMESTAMP(3) METADATAFROM'timestamp') WITH ('connector' = 'kafka',-- cls kafka协议消费控制台给出的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制'topic' = '您的消费主题',-- 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',-- 请替换为您的消费组名称'properties.group.id' = '您的消费组名称','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true',-- 用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6-- 密码是用户的SecretId#SecretKey组合的字符串,比如AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
Sink 端创建语句
CREATE TABLE `cls_sink` (-- CLS对应日志主题的字段`logNo` STRING,`city` STRING,`__CONTENT__` STRING,`__PKG_LOGID__` STRING) WITH ('connector' = 'kafka',-- CLS日志主题基本信息中的日志主题ID'topic' = 'CLS日志主题ID','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,-- CLS日志主题基本信息中通过Kafka协议上报数据的域名'properties.bootstrap.servers' = '${region}.cls.tencentyun.com:9095',-- 用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6-- 密码是用户的SecretId#SecretKey组合的字符串,比如AKIDWrwkHYYHjvqhz1mHVS8YhXXXX#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
说明:
flink 版本 sasl 认证配置 对应包:
1.16版本以下:org.apache.kafka.common.security.plain.PlainLoginModule
1.16版本及以上:org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule