Python SDK

最近更新时间:2025-06-10 16:17:03

我的收藏

操作场景

本文以调用 Python SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

步骤1:准备环境

Rocketmq-client Python 基于 rocketmq-client-cpp 进行包装,因此需要先安装 librocketmq
说明:
目前 Python 客户端仅支持 Linux 和 macOS 操作系统,暂不支持 Windows 系统。
在使用 Python SDK 时要注意安装的 Python 支持的底层芯片架构类型(x86 或是 ARM),例如使用 '64bit','ELF'(即 x86_64 架构) 的 Python 版本,则使用 ARM 芯片的 macOS 系统会出现报错。
1. 安装 librocketmq (版本2.0.0及以上), 安装教程参见 librocketmq 安装
2. 执行如下命令安装 rocketmq-client-python。
pip install rocketmq-client-python


步骤2:生产消息

创建并编译运行生产消息程序。
from rocketmq.client import Producer, Message # 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1 生产消息时,客户端无需重复创建。 producer = Producer(groupName) # 设置服务地址 producer.set_name_server_address(nameserver) # 设置权限(角色名和密钥) producer.set_session_credentials( accessKey, # 角色密钥 secretKey, # 角色名称 '' ) # 启动生产者 producer.start() # 组装消息 # topic 拼接 namespace msg = Message(namespace + '%' + topicName) # 设置keys msg.set_keys(TAGS) # 设置tags msg.set_tags(KEYS) # 消息内容 msg.set_body('This is a new message.') # 发送同步消息 ret = producer.send_sync(msg) print(ret.status, ret.msg_id, ret.offset) # 资源释放 producer.shutdown()
说明:
以下参数需登录 TDMQ RocketMQ 版控制台 获取。
参数
说明
groupName
生产者组名称,在控制台 Group 管理页面复制。
4.x虚拟集群/专享集群:此处需拼接命名空间名称,格式为namespace全称%group名称,例如 MQ_INSTxxx_aaa%GroupTest。
4.x通用集群/5.x集群:此处无需拼接,填写 Group 名称即可。

nameserver
集群接入地址,控制台集群基本信息页面的接入信息模块获取。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。
img


namespace
命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。

topicName
Topic 的名称,在控制台 Topic 管理页面复制。

TAGS
用来设置消息的 TAG。
KEYS
设置消息业务 key。
当前开源社区的 Python 客户端生产消息存在一定缺陷,导致同个 Topic 的不同队列间负载不均,详情可参见 缺陷详情

步骤3:消费消息

创建并编译运行消费消息程序。
import time

from rocketmq.client import PushConsumer, ConsumeStatus


# 消息处理回调
def callback(msg):
# 模拟业务
print('Received message. messageId: ', msg.id, ' body: ', msg.body)
# 消费成功回复CONSUME_SUCCESS
return ConsumeStatus.CONSUME_SUCCESS
# 消费成功回复消息状态
# return ConsumeStatus.RECONSUME_LATER


# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(namespace + '%' + groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
consumer.set_session_credentials(
accessKey, # 角色密钥
secretKey, # 角色名称
''
)
# 订阅topic
consumer.subscribe(namespace + '%' + topicName, callback, TAGS)
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()

while True:
time.sleep(3600)
# 资源释放
consumer.shutdown()
说明:
以下参数需登录 TDMQ RocketMQ 版控制台 获取。
参数
说明
namespace
命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。
groupName
生产者组名称,在控制台 Group 管理页面复制。

nameserver
集群接入地址,控制台集群基本信息页面的接入信息模块获取。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。
accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。
img


topicName
Topic 的名称,在控制台 Topic 管理页面复制。


步骤4. 查看消息详情

发送完成消息后会得到一个消息 ID (messageID),您可以在控制台的消息查询 > 综合查询页面查询刚刚发送的消息,以及该消息的详情和轨迹等信息。

说明:
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 DemoRocketMQ-Client-Python示例