VPC 网络接入

最近更新时间:2025-09-29 15:45:22

我的收藏

操作场景

该任务以 Python 客户端为例指导您使用 VPC 网络接入消息队列 CKafka 版并收发消息。

前提条件

已参考 SDK 概述,获取相关的客户端连接参数

操作步骤

将下载的 Demo 中的 pythonkafkademo 上传至 Linux 服务器,登录 Linux 服务器,进入 pythonkafkademo 目录。

步骤1:添加 Python 依赖库

执行以下命令安装:
pip install kafka-python
如果使用 python3,执行以下命令安装 python3-pip,然后执行 kafka-python 安装:
yum install -y python3-pip
pip3 install kafka-python

步骤2:生产消息

1. 注意,必须修改生产消息程序 producer.py 中配置参数。
from kafka import KafkaProducer
# 创建生产者
producer = KafkaProducer(
bootstrap_servers='$domainName:$port', # 替换为你的 Kafka 地址
api_version=(2, 8, 0), # 显式指定协议版本(根据集群实例调整)
retries=2147483647, # 重试次数设为 int 最大值(无限重试)
retry_backoff_ms=1000, # 重试间隔 1 秒
acks=1 # 只等待 leader 确认,不等待所有副本
)

# 发送消息
for i in range(5):
msg = f"Message {i}"
future = producer.send('$topic_name', value=msg.encode('utf-8')) # $topic_name需要提前在控制台创建,并注意替换
result = future.get(timeout=10)
print(f"发送成功: '{msg}' -> 分区={result.partition}, 偏移量={result.offset}")

producer.close()
参数
描述
bootstrap_servers
接入网络,在控制台的实例基本信息页面的接入方式模块的网络列复制。
topic_name
Topic 名称,您可以在控制台上 Topic 列表页面复制。
2. 编译并运行 producer.py。
3. 查看运行结果。


4. CKafka 控制台Topic 列表页面,选择对应的 Topic,单击更多 > 消息查询,查看刚刚发送的消息。


步骤3:消费消息

1. 修改消费消息程序 consumer.py 中配置参数。
from kafka import KafkaConsumer

# 创建消费者
consumer = KafkaConsumer(
'$topic_name', # topic名字
bootstrap_servers='$domainName:$port', # 替换为你的 Kafka 地址
group_id='$group_id', # 指定消费组名字
auto_offset_reset='earliest', # 从最早消息开始读
api_version=(2, 8, 0), # 显式指定协议版本
enable_auto_commit=True
)

print("开始消费消息...")

try:
for msg in consumer:
value = msg.value.decode('utf-8')
print(f"收到消息: {value}")
except KeyboardInterrupt:
print("消费者已停止。")
finally:
consumer.close()
参数
描述
bootstrap_servers
接入网络,在控制台的实例基本信息页面的接入方式模块的网络列复制。
group_id
消费者的组 ID,根据业务需求自定义
topic_name
Topic 名称,您可以在控制台上 Topic 列表页面复制。
2. 编译并运行 consumer.py。
3. 查看运行结果。

4. CKafka 控制台Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查看详情,查看消费详情。