消费 Demo-多语言 SDK

最近更新时间:2025-06-27 11:06:31

我的收藏
本文介绍使用 Python、Java 消费 CLS 日志。

Python SDK

说明:
推荐您使用 Python 版本:3.9及以上。
Python kafka 客户端:kafka-python、kafka-python-ng、confluent-kafka-python。
如果您配置了数据压缩格式:格式 SNAPPY,请确认安装了 python-snappy 包;格式 LZ4,确认安装了 lz4 包。

单个消费者

import uuid
from kafka import KafkaConsumer,TopicPartition,OffsetAndMetadata
consumer = KafkaConsumer(
# cls kafka 协议消费控制台给出的的主题名称,例如XXXXXX-633a268c-XXXX-4a4c-XXXX-7a9a1a7baXXXX,可在控制台复制
'您的消费主题',
group_id = '您的消费组名称',
auto_offset_reset='earliest',
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
bootstrap_servers = ['kafkaconsumer-${region}.cls.tencentyun.com:9095'],
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',
# 用户名是日志集合ID,例如ca5cXXXXdd2e-4ac0af12-92d4b677d2c6
sasl_plain_username = "${logsetID}",
# 密码是用户的 SecretId#SecretKey 组合的字符串,例如 AKID********************************#XXXXuXtymIXT0Lac注意不要丢失#。建议使用子账号密钥,主账号为子账号授权时,遵循最小权限原则,即子账号的访问策略中的 action、resource 都配置为最小范围,可以满足操作即可。
sasl_plain_password = "${SecretId}#${SecretKey}",
api_version = (0,10,1)
)
print('begin')
for message in consumer:
print('begins')
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
print('end')

多个消费者

from kafka import KafkaConsumer
import threading

TOPIC_NAME = '您的消费主题'
GROUP_ID = '您的消费组名称'
# 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
BOOTSTRAP_SERVERS = ''kafkaconsumer-${region}.cls.tencentyun.com:9095''

def consume_messages(thread_id):
# 创建 Kafka 消费者实例
consumer = KafkaConsumer(
TOPIC_NAME,
group_id=GROUP_ID,
bootstrap_servers=BOOTSTRAP_SERVERS,
value_deserializer=lambda m: m.decode('utf-8'),
auto_offset_reset='earliest',
security_protocol = "SASL_PLAINTEXT",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "${logsetID}"",
sasl_plain_password = "${SecretId}#${SecretKey}",
api_version = (2, 5, 1)
)

try:
for message in consumer:
print(f"Thread {thread_id}: partition = {message.partition}, offset = {message.offset}, value = {message.value}")
except KeyboardInterrupt:
pass
finally:
# 关闭消费者
consumer.close()

if __name__ == "__main__":
# 启动3个消费者线程,这是个例子,请您根据实际情况配置
num_consumers = 3
threads = []
for i in range(num_consumers):
thread = threading.Thread(target=consume_messages, args=(i,))
threads.append(thread)
thread.start()

# 等待所有线程结束
for thread in threads:
thread.join()

Java SDK

注意:
下面的例子中的 Java 代码,jaas.config 的配置,${SecretId}#${SecretKey} 后有(;分号),不要漏填,否则会报错。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
</dependency>
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerGroupTest {

public static void consume() {
Properties props = new Properties();
String logset_id = "${logsetID}";
// CLS 控制台 kafka 协议消费页面展示的主题名称
String topic_id = "您的消费主题";

String accessKeyID = System.getenv("${SecretId}");
String accessKeySecret = System.getenv("${SecretKey}");

String groupId = "您的消费组名称";

// 服务地址+端口,外网端口9096,内网端口9095,例子是内网消费,请根据您的实际情况填写
String hosts = "kafkaconsumer-${region}.cls.tencentyun.com:9095";
props.put("bootstrap.servers", hosts);
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\"" +
logset_id + "\\" password=\\"" + accessKeyID + "#" + accessKeySecret + "\\";");

// Kafka 消费者配置
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "5000");
props.put("session.timeout.ms", "10000");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.interval.ms", "120000");
props.put("heartbeat.interval.ms", "3000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 创建 Kafka 消费者实例
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Collections.singletonList(topic_id));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
}
public static void main(String[] args){
consume();
}
}