本文介绍使用 Python、Java 消费 CLS 日志。
Python SDK
说明:
推荐您使用 Python 版本:3.9及以上。
Python kafka 客户端:kafka-python、kafka-python-ng、confluent-kafka-python。
如果您配置了数据压缩格式:格式 SNAPPY,请确认安装了 python-snappy 包;格式 LZ4,确认安装了 lz4 包。
单个消费者
import uuidfrom kafka import KafkaConsumer,TopicPartition,OffsetAndMetadataconsumer = KafkaConsumer(# cls kafka 协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制'您的消费主题',group_id = '您的消费组名称',auto_offset_reset='earliest',# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写bootstrap_servers = ['kafkaconsumer-${region}.cls.tencentyun.com:9095'],security_protocol = "SASL_PLAINTEXT",sasl_mechanism = 'PLAIN',# 用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6sasl_plain_username = "${logsetID}",# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。sasl_plain_password = "${SecretId}#${SecretKey}",api_version = (0,10,1))print('begin')for message in consumer:print('begins')print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))print('end')
多个消费者
from kafka import KafkaConsumerimport threadingTOPIC_NAME = '您的消费主题'GROUP_ID = '您的消费组名称'# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写BOOTSTRAP_SERVERS = ''kafkaconsumer-${region}.cls.tencentyun.com:9095''def consume_messages(thread_id):# 创建 Kafka 消费者实例consumer = KafkaConsumer(TOPIC_NAME,group_id=GROUP_ID,bootstrap_servers=BOOTSTRAP_SERVERS,value_deserializer=lambda m: m.decode('utf-8'),auto_offset_reset='earliest',security_protocol = "SASL_PLAINTEXT",sasl_mechanism = 'PLAIN',sasl_plain_username = "${logsetID}"",sasl_plain_password = "${SecretId}#${SecretKey}",api_version = (2, 5, 1))try:for message in consumer:print(f"Thread {thread_id}: partition = {message.partition}, offset = {message.offset}, value = {message.value}")except KeyboardInterrupt:passfinally:# 关闭消费者consumer.close()if __name__ == "__main__":# 启动3个消费者线程,这是个例子,请您根据实际情况配置num_consumers = 3threads = []for i in range(num_consumers):thread = threading.Thread(target=consume_messages, args=(i,))threads.append(thread)thread.start()# 等待所有线程结束for thread in threads:thread.join()
Java SDK
注意:
下面的例子中的 Java 代码,jaas.config 的配置,
${SecretId}#${SecretKey}
后有(;分号),不要漏填,否则会报错。<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.5.0</version></dependency>import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;import java.util.Collections;import java.util.Properties;public class KafkaConsumerGroupTest {public static void consume() {Properties props = new Properties();String logset_id = "${logsetID}";// CLS 控制台 kafka 协议消费页面展示的主题名称String topic_id = "您的消费主题";String accessKeyID = System.getenv("${SecretId}");String accessKeySecret = System.getenv("${SecretKey}");String groupId = "您的消费组名称";// 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写String hosts = "kafkaconsumer-${region}.cls.tencentyun.com:9095";props.put("bootstrap.servers", hosts);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"" +logset_id + "\\" password=\\"" + accessKeyID + "#" + accessKeySecret + "\\";");// Kafka 消费者配置props.put("group.id", groupId);props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "5000");props.put("session.timeout.ms", "10000");props.put("auto.offset.reset", "earliest");props.put("max.poll.interval.ms", "120000");props.put("heartbeat.interval.ms", "3000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 创建 Kafka 消费者实例KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);consumer.subscribe(Collections.singletonList(topic_id));while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10000));for (ConsumerRecord<String, String> record : records) {System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());}}}public static void main(String[] args){consume();}}