Python SDK 使用

最近更新时间:2025-11-20 18:07:23

我的收藏

操作场景

本文以调用 Python SDK 为例介绍通过开源 SDK 实现 RocketMQ 5.x 消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

已参考 SDK 概述,获取相关的客户端连接参数

操作步骤

步骤1:准备环境

要求 Python 在 3.7 以上,RocketMQ 的 Python SDK 当前建议版本 5.0.6
pip install rocketmq-python-client==5.0.6

Running Environment

Operating System: Ubuntu 24.04.2 LTS / x86_64

Runtime Version: GNU bash, version 5.2.21(1)-release (x86_64-pc-linux-gnu)

步骤2:生产消息

创建并编译运行生产消息程序。
from rocketmq import ClientConfiguration, Credentials, Message, Producer

if __name__ == '__main__':
endpoints = "rmq-xxxx.rocketmq.gz.qcloud.tencenttdmq.com:8080"
credentials = Credentials()
# if auth enable
credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
topic = "topic"
producer = Producer(config, (topic,))

try:
producer.startup()
try:
msg = Message()
# topic for the current message
msg.topic = topic
msg.body = "hello, rocketmq.".encode('utf-8')
# secondary classifier of message besides topic
msg.tag = "rocketmq-send-message"
# key(s) of the message, another way to mark message besides message id
msg.keys = "send_sync"
# user property for the message
msg.add_property("send", "sync")
res = producer.send(msg)
print(f"{producer.__str__()} send message success. {res}")
producer.shutdown()
print(f"{producer.__str__()} shutdown.")
except Exception as e:
print(f"normal producer example raise exception: {e}")
producer.shutdown()
except Exception as e:
print(f"{producer.__str__()} startup raise exception: {e}")
producer.shutdown()
说明:
以下参数需登录 TDMQ RocketMQ 版控制台 获取。
参数
说明
endpoints
集群接入地址,控制台集群基本信息页面的接入信息模块获取。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。

accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

topicName
Topic 的名称,在控制台 Topic 管理页面复制。

当前开源社区的 Python 客户端生产消息存在一定缺陷,导致同一个 Topic 的不同队列间负载不均,详情可参见 缺陷详情

步骤3:消费消息

创建并编译运行消费消息程序。
from rocketmq import ClientConfiguration, Credentials, SimpleConsumer

if __name__ == '__main__':
endpoints = "rmq-xxxx.rocketmq.gz.qcloud.tencenttdmq.com:8080"
# if auth enable
credentials = Credentials("ak", "sk")
config = ClientConfiguration(endpoints, credentials)
# with namespace
# config = ClientConfiguration(endpoints, credentials, "namespace")
topic = "topic"
# in most case, you don't need to create too many consumers, singleton pattern is recommended
# close the simple consumer when you don't need it anymore
simple_consumer = SimpleConsumer(config, "consumer-group")
try:
simple_consumer.startup()
try:
simple_consumer.subscribe(topic)
# use tag filter
# simple_consumer.subscribe(topic, FilterExpression("tag"))
while True:
try:
# max message num for each long polling and message invisible duration after it is received
messages = simple_consumer.receive(32, 15)
if messages is not None:
print(f"{simple_consumer.__str__()} receive {len(messages)} messages.")
for msg in messages:
simple_consumer.ack(msg)
print(f"{simple_consumer.__str__()} ack message:[{msg.message_id}].")
except Exception as e:
print(f"receive or ack message raise exception: {e}")
except Exception as e:
print(f"{simple_consumer.__str__()} subscribe topic:{topic} raise exception: {e}")
simple_consumer.shutdown()
except Exception as e:
print(f"{simple_consumer.__str__()} startup raise exception: {e}")
simple_consumer.shutdown()
说明:
以下参数需登录 TDMQ RocketMQ 版控制台 获取。
参数
说明
groupName
消费组名称,在控制台 Group 管理页面复制。

endpoints
集群接入地址,控制台集群基本信息页面的接入信息模块获取。

secretKey
角色名称,在控制台的集群权限页面 SecretKey 列复制。

accessKey
角色密钥,在控制台的集群权限页面 AccessKey 列复制。

topicName
Topic 的名称,在控制台 Topic 管理页面复制。


步骤4:查看消息详情

完成消息发送后会得到一个消息 ID (messageID),您可以在控制台的消息查询 > 综合查询页面查询刚刚发送的消息,以及该消息的详情和轨迹等信息。
说明:
上述是对消息的发布和订阅方式的简单介绍,更多操作可参见 DemoRocketMQ-Clients-Python示例