使用 Kafka 协议消费功能,您可以将一个日志主题,当作一个 Kafka Topic 来消费。可将采集到 CLS 的日志数据,消费到下游的大数据组件或者数据仓库。
本文提供了开源组件 Flink、Flume、Logstash、以及腾讯云 Oceanus 消费日志主题的 Demo。
前提条件
已开通日志服务,创建日志集与日志主题,并成功采集到日志数据。
确保当前操作账号拥有开通 Kafka 协议消费的权限,权限问题请参见 CLS 访问策略模板。
注意:
开启该功能后,默认将从最新数据开始消费。消息保留时间2小时,超过2小时还未消费的数据将不会被保存。如您需要消费更多历史数据,请 联系我们 单独开通。
例如您12:00打开 Kafka 协议消费,那么您将从12:00的数据开始消费,直到15:00,您可以消费13:00 - 15:00的数据,13:00之前的数据不能消费。
内网消费和外网消费说明
内网消费:使用内网域名进行日志消费,流量费用为0.18元/GB。例如您的原始日志为100GB,消费时选择 Snappy 压缩,那么计量约为50GB,内网读流量费用为50GB * 0.18元,即9元。一般来说,如果您的消费端和日志主题在同一个 VPC 或者同一个地域,就可以使用内网消费。
外网消费:使用外网域名进行日志消费,流量费用为0.8元/GB。例如您的原始日志为100GB,消费时选择 Snappy 压缩,那么计量约为50GB,外网读流量费用为50GB * 0.8元,即40元。一般来说,如果您的消费端和日志主题不在同一个 VPC,也不在同一个地域,需要使用外网消费。
操作步骤
1. 登录日志服务控制台,选择左侧导航栏中的 日志主题。
2. 在“日志主题”页面,单击需要使用 Kafka 协议消费的日志主题 ID/名称,进入日志主题管理页面。
3. 在日志主题管理页面中,单击 Kafka 协议消费页签。
4. 单击右侧的编辑,将“当前状态”的开关按钮设置为打开状态后,单击确定。
5. 控制台给出 Topic、Host+Port 的信息。您可以复制该信息,构造您的消费者(KafkaConsumer)。您也可以使用自动生成消费者小工具,生成一个可运行的消费客户端,如有其他业务逻辑请修改代码。
说明:
如您需要从日志主题最早的数据开始消费,请在消费者中指定 offset 为 earliest;如您需要从当前数据开始消费,请指定 offset 为 latest。
消费分区的个数和日志主题的分区个数是相同的,您可在日志主题详情中查看分区个数。
消费者参数说明
参数 | 说明 |
用户认证方式 | 目前仅支持 SASL_PLAINTEXT。 |
hosts | 内网消费:kafkaconsumer-${region}.cls.tencentyun.com:9095。 外网消费:kafkaconsumer-${region}.cls.tencentcs.com:9096,详细请参见 可用域名 - Kafka 消费日志。 |
topic | 消费主题 ID,请在 Kafka 协议消费的控制台复制。例如 XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX。 |
username | 配置为${LogSetID},即日志集 ID。 例如:0f8e4b82-8adb-47b1-XXXX-XXXXXXXXXX ,请在Kafka协议消费的控制台复制。 |
password | 配置为 ${SecretId}#${SecretKey} 。例如:XXXXXXXXXXXXXX#YYYYYYYY,请登录 腾讯云访问管理,在左侧导航栏中单击访问密钥,API 密钥或者项目密钥均可使用,建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。 |
注意:
下面的例子中的 Java 代码,jaas.config 的配置,
${SecretId}#${SecretKey}
后有(;分号),不要漏填,否则会报错。Python SDK
import uuidfrom kafka import KafkaConsumer,TopicPartition,OffsetAndMetadataconsumer = KafkaConsumer(#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制'您的消费主题',group_id = uuid.uuid4().hex,auto_offset_reset='earliest',#服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写bootstrap_servers = ['kafkaconsumer-${region}.cls.tencentyun.com:9095'],security_protocol = "SASL_PLAINTEXT",sasl_mechanism = 'PLAIN',#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6sasl_plain_username = "${logsetID}",#密码是用户的SecretId#SecretKey组合的字符串,例如AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.sasl_plain_password = "${SecretId}#${SecretKey}",api_version = (1,1,1))print('begin')for message in consumer:print('begins')print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))print('end')
Java SDK
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.0</version></dependency>import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerGroupTest {public static void consume() {Properties props = new Properties();String logset_id = "logset_id";// CLS 控制台kafka协议消费页面展示的主题名称String topic_id = "kafka_topic_name";String accessKeyID = System.getenv("SecretId");String accessKeySecret = System.getenv("SecretKey");String groupId = "kafka-test";#服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写String hosts = "kafkaconsumer-ap-guangzhou.cls.tencentcs.com:9095";props.put("bootstrap.servers", hosts);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"" +logset_id + "\\" password=\\"" + accessKeyID + "#" + accessKeySecret + "\\";");// Kafka消费者配置props.put("group.id", groupId);props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "30000");props.put("session.timeout.ms", "130000");props.put("auto.offset.reset", "earliest");props.put("max.poll.interval.ms", "120000");props.put("heartbeat.interval.ms", "5000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建Kafka消费者实例KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);consumer.subscribe(Collections.singletonList(topic_id));while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10000));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());}}}public static void main(String[] args){consume();}}
Logstash 消费 CLS 日志
input {kafka {#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制topics => "您的消费主题"#服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写bootstrap_servers => "kafkaconsumer-${region}.cls.tencentyun.com:9095"group_id => "您的消费组id"security_protocol => "SASL_PLAINTEXT"sasl_mechanism => "PLAIN"#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6#密码是用户的SecretId#SecretKey组合的字符串,例如AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可.sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='${logsetID}' password='${securityID}#${securityKEY}';"}}output {stdout { codec => json }}
腾讯云 Oceanus 消费 CLS 日志
在 Oceanus 控制台新建作业。如下图所示:
CREATE TABLE `nginx_source`( # 日志中字段`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- kafka分区`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka',#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制'topic' = '您的消费主题',# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',# 请替换为您的消费组名称'properties.group.id' = '您的消费组名称','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6#密码是用户的SecretId#SecretKey组合的字符串,例如AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
Flink 消费 CLS 日志
开启日志的 kafka 消费协议
确认 flink-connector-kafka 依赖
确保 flink lib 中有 flink-connector-kafka 后,直接在 sql 中注册 kafka 表即可使用。依赖如下:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.14.4</version></dependency>
注册 flink 表
CREATE TABLE `nginx_source`(#日志中的字段`@metadata` STRING,`@timestamp` TIMESTAMP,`agent` STRING,`ecs` STRING,`host` STRING,`input` STRING,`log` STRING,`message` STRING,#kafka分区`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,`ts` TIMESTAMP(3) METADATA FROM 'timestamp') WITH ('connector' = 'kafka',#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制'topic' = '您的消费主题',# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写'properties.bootstrap.servers' = 'kafkaconsumer-${region}.cls.tencentyun.com:9095',# 请替换为您的消费组名称'properties.group.id' = '您的消费组名称','scan.startup.mode' = 'earliest-offset','format' = 'json','json.fail-on-missing-field' = 'false','json.ignore-parse-errors' = 'true' ,#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6#密码是用户的SecretId#SecretKey组合的字符串,例如AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";','properties.security.protocol' = 'SASL_PLAINTEXT','properties.sasl.mechanism' = 'PLAIN');
注意:
flink 版本 sasl 认证配置对应包:
1.16版本以下:org.apache.kafka.common.security.plain.PlainLoginModule。
1.16版本及以上:'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule'。
查询使用
执行成功后,即可查询使用。
select count(*) , host from nginx_source group by host;
Flume 消费 CLS 日志
若您需将日志数据消费到自建的 HDFS,Kafka 集群,则可以通过 Flume 组件来中转,具体操作参见如下示例。
说明:
请使用1.9.0及以上的 Flume 版本。
开启日志的 kafka 消费协议
Flume 配置
a1.sources = source_kafkaa1.sinks = sink_locala1.channels = channel1#配置Sourcea1.sources.source_kafka.type = org.apache.flume.source.kafka.KafkaSourcea1.sources.source_kafka.batchSize = 10a1.sources.source_kafka.batchDurationMillis = 200000#服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写a1.sources.source_kafka.kafka.bootstrap.servers = kafkaconsumer-${region}.cls.tencentyun.com:9095#cls kafka协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制a1.sources.source_kafka.kafka.topics = 您的消费主题#请替换为您的消费组名称a1.sources.source_kafka.kafka.consumer.group.id = 您的消费组名称a1.sources.source_kafka.kafka.consumer.auto.offset.reset = earliesta1.sources.source_kafka.kafka.consumer.security.protocol = SASL_PLAINTEXTa1.sources.source_kafka.kafka.consumer.sasl.mechanism = PLAIN#用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6#密码是用户的SecretId#SecretKey组合的字符串,例如AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的action、resource都配置为最小范围,可以满足操作即可,注意jaas.config最后有;分号,不填写会报错.a1.sources.source_kafka.kafka.consumer.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="${logsetID}" password="${SecretId}#${SecretKey}";//配置sinka1.sinks.sink_local.type = loggera1.channels.channel1.type = memorya1.channels.channel1.capacity = 1000a1.channels.channel1.transactionCapacity = 100//将source和sink绑定到channela1.sources.source_kafka.channels = channel1a1.sinks.sink_local.channel = channel1