本文介绍使用流处理计算框架 Flink、腾讯云 Oceanus 来消费日志。
腾讯云 Oceanus 消费 CLS 日志
1. 在 Oceanus 控制台 新建 SQL 作业。如下图所示:


2. 编写 SQL 语句。
CREATE TABLE `nginx_source`( # 日志中字段`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- kafka 分区`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka',# cls kafka 协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制'topic' = '您的消费主题',# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',# 请替换为您的消费组名称'properties.group.id' = '您的消费组名称','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,# 用户名是日志集合ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac,注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可,注意 jaas.config 最后有;分号,不填写会报错。'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
Flink 消费 CLS 日志
开启日志的 Kafka 消费协议
确认 flink-connector-kafka 依赖
确保 flink lib 中有 flink-connector-kafka 后,直接在 sql 中注册 Kafka 表即可使用。依赖如下:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.14.4</version></dependency>
注册 flink 表
CREATE TABLE `nginx_source`(#日志中的字段`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,# kafka 分区`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka',# cls kafka 协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制'topic' = '您的消费主题',# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',# 请替换为您的消费组名称'properties.group.id' = '您的消费组名称','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,# 用户名是日志集合ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可,注意 jaas.config 最后有;分号,不填写会报错。'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
注意:
flink 版本 sasl 认证配置对应包:
1.16版本以下:org.apache.kafka.common.security.plain.PlainLoginModule。
1.16版本及以上:org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule。
查询使用
执行成功后,即可查询使用。
select count(*) , host from nginx_source group by host;