使用 Kafka 协议消费日志

最近更新时间:2024-11-12 20:54:32

我的收藏
使用 Kafka 协议消费功能,您可以将一个日志主题,当作一个 Kafka Topic 来消费。可将采集到 CLS 的日志数据,消费到下游的大数据组件或者数据仓库。
本文提供了开源组件 Flink、Flume、Logstash、以及腾讯云 Oceanus 消费日志主题的 Demo。

前提条件

已开通日志服务,创建日志集与日志主题,并成功采集到日志数据。
确保当前操作账号拥有开通 Kafka 协议消费的权限,权限问题请参见 CLS 访问策略模板
注意:
开启该功能后,默认将从最新数据开始消费。消息保留时间2小时,超过2小时还未消费的数据将不会被保存。如您需要消费更多历史数据,请 联系我们 单独开通。 例如您12:00打开 Kafka 协议消费,那么您将从12:00的数据开始消费,直到15:00,您可以消费13:00 - 15:00的数据,13:00之前的数据不能消费。

内网消费和外网消费说明

内网消费:使用内网域名进行日志消费,流量费用为0.18元/GB。例如您的原始日志为100GB,消费时选择 Snappy 压缩,那么计量约为50GB,内网读流量费用为50GB * 0.18元,即9元。一般来说,如果您的消费端和日志主题在同一个 VPC 或者同一个地域,就可以使用内网消费。
外网消费:使用外网域名进行日志消费,流量费用为0.8元/GB。例如您的原始日志为100GB,消费时选择 Snappy 压缩,那么计量约为50GB,外网读流量费用为50GB * 0.8元,即40元。一般来说,如果您的消费端和日志主题不在同一个 VPC,也不在同一个地域,需要使用外网消费。




操作步骤

1. 登录日志服务控制台,选择左侧导航栏中的 日志主题
2. 在“日志主题”页面,单击需要使用 Kafka 协议消费的日志主题 ID/名称,进入日志主题管理页面。
3. 在日志主题管理页面中,单击 Kafka 协议消费页签。
4. 单击右侧的编辑,将“当前状态”的开关按钮设置为打开状态后,单击确定
5. 控制台给出 Topic、Host+Port 的信息。您可以复制该信息,构造您的消费者(KafkaConsumer)。您也可以使用自动生成消费者小工具,生成一个可运行的消费客户端,如有其他业务逻辑请修改代码。



说明:
如您需要从日志主题最早的数据开始消费,请在消费者中指定 offset 为 earliest;如您需要从当前数据开始消费,请指定 offset 为 latest。
消费分区的个数和日志主题的分区个数是相同的,您可在日志主题详情中查看分区个数。

消费者参数说明

参数
说明
用户认证方式
目前仅支持 SASL_PLAINTEXT。
hosts
内网消费:kafkaconsumer-${region}.cls.tencentyun.com:9095。
外网消费:kafkaconsumer-${region}.cls.tencentcs.com:9096,详细请参见 可用域名 - Kafka 消费日志
topic
消费主题 ID,请在 Kafka 协议消费的控制台复制。例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX。
username
配置为${LogSetID},即日志集 ID。 例如:0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXX ,请在Kafka协议消费的控制台复制。
password
配置为${SecretId}#${SecretKey}。例如:XXXXXXXXXXXXXX#YYYYYYYY,请登录 腾讯云访问管理,在左侧导航栏中单击访问密钥,API 密钥或者项目密钥均可使用,建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。
注意:
下面的例子中的 Java 代码,jaas.config 的配置,${SecretId}#${SecretKey}后有(;分号),不要漏填,否则会报错。

Python SDK

import uuid
from kafka import KafkaConsumer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'您的消费主题',
group_id = uuid.uuid4().hex,
auto_offset_reset='earliest',
#服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
bootstrap_servers = ['kafkaconsumer-${region}.cls.tencentyun.com:9095'],
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',
#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
sasl_plain_username = "${logsetID}",
#密码是用户的SecretId#SecretKey组合的字符串,例如AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.
sasl_plain_password = "${SecretId}#${SecretKey}",
api_version = (1,1,1)
)
print('begin')
for message in consumer:
print('begins')
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
print('end')

Java SDK

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerGroupTest {

public static void consume() {
Properties props = new Properties();
String logset_id = "logset_id";
// CLS 控制台kafka协议消费页面展示的主题名称
String topic_id = "kafka_topic_name";

String accessKeyID = System.getenv("SecretId");
String accessKeySecret = System.getenv("SecretKey");

String groupId = "kafka-test";

#服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
String hosts = "kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9095";
props.put("bootstrap.servers", hosts);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"" +
logset_id + "\\" password=\\"" + accessKeyID + "#" + accessKeySecret + "\\";");

// Kafka消费者配置
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "30000");
props.put("session.timeout.ms", "130000");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.interval.ms", "120000");
props.put("heartbeat.interval.ms", "5000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 创建Kafka消费者实例
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList(topic_id));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
}
public static void main(String[] args){
consume();
}
}

Logstash 消费 CLS 日志

input {
kafka {
#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
topics => "您的消费主题"
#服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
bootstrap_servers => "kafkaconsumer-${region}.cls.tencentyun.com:9095"
group_id => "您的消费组id"
security_protocol => "SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
#密码是用户的SecretId#SecretKey组合的字符串,例如AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${securityID}#${securityKEY}';"
}
}
output {
stdout { codec => json }
}


腾讯云 Oceanus 消费 CLS 日志

在 Oceanus 控制台新建作业。如下图所示:


CREATE TABLE `nginx_source`
( # 日志中字段
`@metadata` STRING,
`@timestamp` TIMESTAMP,
`agent` STRING,
`ecs` STRING,
`host` STRING,
`input` STRING,
`log` STRING,
`message` STRING,
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- kafka分区
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'topic' = '您的消费主题',
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',
# 请替换为您的消费组名称
'properties.group.id' = '您的消费组名称',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true' ,
#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
#密码是用户的SecretId#SecretKey组合的字符串,例如AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);


Flink 消费 CLS 日志

开启日志的 kafka 消费协议

参见 操作步骤 开启日志的 kafka 消费协议,并获取消费的服务域名和 Topic。

确认 flink-connector-kafka 依赖

确保 flink lib 中有 flink-connector-kafka 后,直接在 sql 中注册 kafka 表即可使用。依赖如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.14.4</version>
</dependency>

注册 flink 表

CREATE TABLE `nginx_source`
(
#日志中的字段
`@metadata` STRING,
`@timestamp` TIMESTAMP,
`agent` STRING,
`ecs` STRING,
`host` STRING,
`input` STRING,
`log` STRING,
`message` STRING,
#kafka分区
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'topic' = '您的消费主题',
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',
# 请替换为您的消费组名称
'properties.group.id' = '您的消费组名称',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true' ,
#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
#密码是用户的SecretId#SecretKey组合的字符串,例如AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN'
);
注意:
flink 版本 sasl 认证配置对应包:
1.16版本以下:org.apache.kafka.common.security.plain.PlainLoginModule。
1.16版本及以上:'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule'。

查询使用

执行成功后,即可查询使用。
select count(*) , host from nginx_source group by host;

Flume 消费 CLS 日志

若您需将日志数据消费到自建的 HDFS,Kafka 集群,则可以通过 Flume 组件来中转,具体操作参见如下示例。
说明:
请使用1.9.0及以上的 Flume 版本。

开启日志的 kafka 消费协议

参见 操作步骤 开启日志的 Kafka 消费协议,并获取消费的服务域名和 Topic。

Flume 配置

a1.sources = source_kafka
a1.sinks = sink_local
a1.channels = channel1

#配置Source
a1.sources.source_kafka.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source_kafka.batchSize = 10
a1.sources.source_kafka.batchDurationMillis = 200000
#服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
a1.sources.source_kafka.kafka.bootstrap.servers = kafkaconsumer-${region}.cls.tencentyun.com:9095
#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
a1.sources.source_kafka.kafka.topics = 您的消费主题
#请替换为您的消费组名称
a1.sources.source_kafka.kafka.consumer.group.id = 您的消费组名称
a1.sources.source_kafka.kafka.consumer.auto.offset.reset = earliest
a1.sources.source_kafka.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.source_kafka.kafka.consumer.sasl.mechanism = PLAIN
#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
#密码是用户的SecretId#SecretKey组合的字符串,例如AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.
a1.sources.source_kafka.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";

//配置sink
a1.sinks.sink_local.type = logger

a1.channels.channel1.type = memory
a1.channels.channel1.capacity = 1000
a1.channels.channel1.transactionCapacity = 100

//将source和sink绑定到channel
a1.sources.source_kafka.channels = channel1
a1.sinks.sink_local.channel = channel1