Python SDK

最近更新时间:2024-07-23 11:08:01

我的收藏

操作场景

本文以调用 Python SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

步骤1:准备环境

Rocketmq-client Python 基于 rocketmq-client-cpp 进行包装,因此需要先安装 librocketmq
说明:
目前Python客户端仅支持 Linux 和 macOS 操作系统,暂不支持 Windows 系统。
1. 安装 librocketmq (版本2.0.0及以上), 安装教程参见 librocketmq 安装
2. 执行如下命令安装 rocketmq-client-python。
pip install rocketmq-client-python


步骤2:生产消息

创建并编译运行生产消息程序。
from rocketmq.client import Producer, Message

# 初始化生产者,并设置生产组信息,组名称使用全称,例:rocketmq-xxx|namespace_python%group1 生产消息时,客户端无需重复创建。
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
producer.set_session_credentials(
accessKey, # 角色密钥
secretKey, # 角色名称
''
)
# 启动生产者
producer.start()

# 组装消息
# topic 拼接 namespace
msg = Message(namespace + '%' + topicName)
# 设置keys
msg.set_keys(TAGS)
# 设置tags
msg.set_tags(KEYS)
# 消息内容
msg.set_body('This is a new message.')

# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()
参数
说明
groupName
生产者组名称。在控制台集群管理中 Group tab 中获取。
nameserver
集群接入地址,在控制台集群管理页面操作列的获取接入地址获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。



secretKey
角色名称,在 角色管理 页面复制。
accessKey
角色密钥,在 角色管理 页面复制密钥列复制。
img


namespace
命名空间名称,在控制台 Namespace 页面复制。
topicName
Topic 的名称,在控制台 Topic 页面复制。
TAGS
用来设置消息的 TAG。
KEYS
设置消息业务 key。
当前开源社区的 Python 客户端生产消息存在一定缺陷,导致同个 Topic 的不同队列间负载不均,详情可参见 缺陷详情

步骤3:消费消息

创建并编译运行消费消息程序。
import time

from rocketmq.client import PushConsumer, ConsumeStatus


# 消息处理回调
def callback(msg):
# 模拟业务
print('Received message. messageId: ', msg.id, ' body: ', msg.body)
# 消费成功回复CONSUME_SUCCESS
return ConsumeStatus.CONSUME_SUCCESS
# 消费成功回复消息状态
# return ConsumeStatus.RECONSUME_LATER


# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(namespace + '%' + groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
consumer.set_session_credentials(
accessKey, # 角色密钥
secretKey, # 角色名称
''
)
# 订阅topic
consumer.subscribe(namespace + '%' + topicName, callback, TAGS)
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()

while True:
time.sleep(3600)
# 资源释放
consumer.shutdown()
参数
说明
namespace
命名空间名称,在控制台 Namespace 页面复制。
groupName
消费者 Group 的名称,在控制台 Group 页面复制。
nameserver
集群接入地址,在控制台集群管理页面操作列的获取接入地址获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。



secretKey
角色名称,在 角色管理 页面复制。
accessKey
角色密钥,在 角色管理 页面复制密钥列复制。
img


topicName
Topic 的名称,在控制台 Topic 页面复制。
TAGS
设置订阅消息的tag,默认为"*",表示订阅所有消息。

步骤4:查看消费详情

登录 TDMQ 控制台,在集群管理 > Group 页面,可查看与 Group 连接的客户端列表,单击操作列的查看详情,可查看消费者详情。
img


说明:
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 DemoRocketMQ-Client-Python示例