操作场景
该任务以 Python 客户端为例指导您使用 VPC 网络接入消息队列 CKafka 版并收发消息。
前提条件
操作步骤
将下载的 Demo 中的 pythonkafkademo 上传至 Linux 服务器,登录 Linux 服务器,进入 pythonkafkademo 目录。
步骤1:添加 Python 依赖库
执行以下命令安装:
pip install kafka-python
步骤2:生产消息
1. 修改生产消息程序 producer.py 中配置参数。
#coding:utf8from kafka import KafkaProducerimport jsonproducer = KafkaProducer(bootstrap_servers = ['$domainName:$port'],api_version = (0,10,0))message = "Hello World! Hello Ckafka!"msg = json.dumps(message).encode()producer.send('topic_name',value = msg)print("produce message " + message + " success.")producer.close()
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 ![]() |
topic_name | Topic 名称,您可以在控制台上 topic 管理页面复制。 ![]() |
2. 编译并运行 producer.py。
3. 查看运行结果。


4. 在 CKafka 控制台 的 topic 管理页面,选择对应的 Topic,单击更多 > 消息查询,查看刚刚发送的消息。


步骤3:消费消息
1. 修改消费消息程序 consumer.py 中配置参数。
#coding:utf8from kafka import KafkaConsumerconsumer = KafkaConsumer('$topic_name',group_id = "$group_id",bootstrap_servers = ['$domainName:$port'],api_version = (0,10,0))for message in consumer:print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" % (message.topic, message.partition, message.offset, message.value))
参数 | 描述 |
bootstrap_servers | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 ![]() |
group_id | 消费者的组 ID,根据业务需求自定义 |
topic_name | Topic 名称,您可以在控制台上 topic 管理页面复制。 ![]() |
2. 编译并运行 consumer.py。
3. 查看运行结果。


4. 在 CKafka 控制台 的 Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查询详情,查看消费详情。

