消费 Demo-流处理

最近更新时间:2025-06-27 15:38:52

我的收藏
本文介绍使用流处理计算框架 Flink、腾讯云 Oceanus 来消费日志。

腾讯云 Oceanus 消费 CLS 日志

1. 在 Oceanus 控制台 新建 SQL 作业。如下图所示:



2. 编写 SQL 语句。
CREATE TABLE `nginx_source`
( # 日志中字段
`@metadata` STRING,
`@timestamp` TIMESTAMP,
`agent` STRING,
`ecs` STRING,
`host` STRING,
`input` STRING,
`log` STRING,
`message` STRING,
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- kafka 分区
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
# cls kafka 协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'topic' = '您的消费主题',
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',
# 请替换为您的消费组名称
'properties.group.id' = '您的消费组名称',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true' ,
# 用户名是日志集合ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac,注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可,注意 jaas.config 最后有;分号,不填写会报错。
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);


Flink 消费 CLS 日志

开启日志的 Kafka 消费协议

参见 操作步骤 开启日志的 Kafka 消费协议,并获取消费的服务域名和 Topic。

确认 flink-connector-kafka 依赖

确保 flink lib 中有 flink-connector-kafka 后,直接在 sql 中注册 Kafka 表即可使用。依赖如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.14.4</version>
</dependency>

注册 flink 表

CREATE TABLE `nginx_source`
(
#日志中的字段
`@metadata` STRING,
`@timestamp` TIMESTAMP,
`agent` STRING,
`ecs` STRING,
`host` STRING,
`input` STRING,
`log` STRING,
`message` STRING,
# kafka 分区
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
# cls kafka 协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'topic' = '您的消费主题',
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',
# 请替换为您的消费组名称
'properties.group.id' = '您的消费组名称',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true' ,
# 用户名是日志集合ID,例如 ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可,注意 jaas.config 最后有;分号,不填写会报错。
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);
注意:
flink 版本 sasl 认证配置对应包:
1.16版本以下:org.apache.kafka.common.security.plain.PlainLoginModule。
1.16版本及以上:org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule。

查询使用

执行成功后,即可查询使用。
select count(*) , host from nginx_source group by host;