操作场景
该任务以 Python 客户端为例指导您使用 VPC 网络接入消息队列 CKafka 版并收发消息。
前提条件
下载 Demo
已参考 SDK 概述,获取相关的客户端连接参数
操作步骤
将下载的 Demo 中的 pythonkafkademo 上传至 Linux 服务器,登录 Linux 服务器,进入 pythonkafkademo 目录。
步骤1:添加 Python 依赖库
执行以下命令安装:
pip install kafka-python
如果使用 python3,执行以下命令安装 python3-pip,然后执行 kafka-python 安装:
yum install -y python3-pippip3 install kafka-python
步骤2:生产消息
1. 注意,必须修改生产消息程序 producer.py 中配置参数。
from kafka import KafkaProducer# 创建生产者producer = KafkaProducer(bootstrap_servers='$domainName:$port', # 替换为你的 Kafka 地址api_version=(2, 8, 0), # 显式指定协议版本(根据集群实例调整)retries=2147483647, # 重试次数设为 int 最大值(无限重试)retry_backoff_ms=1000, # 重试间隔 1 秒acks=1 # 只等待 leader 确认,不等待所有副本)# 发送消息for i in range(5):msg = f"Message {i}"future = producer.send('$topic_name', value=msg.encode('utf-8')) # $topic_name需要提前在控制台创建,并注意替换result = future.get(timeout=10)print(f"发送成功: '{msg}' -> 分区={result.partition}, 偏移量={result.offset}")producer.close()
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例基本信息页面的接入方式模块的网络列复制。 |
topic_name | Topic 名称,您可以在控制台上 Topic 列表页面复制。 |
2. 编译并运行 producer.py。
3. 查看运行结果。

4. 在 CKafka 控制台 的 Topic 列表页面,选择对应的 Topic,单击更多 > 消息查询,查看刚刚发送的消息。

步骤3:消费消息
1. 修改消费消息程序 consumer.py 中配置参数。
from kafka import KafkaConsumer# 创建消费者consumer = KafkaConsumer('$topic_name', # topic名字bootstrap_servers='$domainName:$port', # 替换为你的 Kafka 地址group_id='$group_id', # 指定消费组名字auto_offset_reset='earliest', # 从最早消息开始读api_version=(2, 8, 0), # 显式指定协议版本enable_auto_commit=True)print("开始消费消息...")try:for msg in consumer:value = msg.value.decode('utf-8')print(f"收到消息: {value}")except KeyboardInterrupt:print("消费者已停止。")finally:consumer.close()
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例基本信息页面的接入方式模块的网络列复制。 |
group_id | 消费者的组 ID,根据业务需求自定义 |
topic_name | Topic 名称,您可以在控制台上 Topic 列表页面复制。 |
2. 编译并运行 consumer.py。
3. 查看运行结果。

4. 在 CKafka 控制台 的 Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查看详情,查看消费详情。
