Python SDK 使用

最近更新时间:2024-04-25 10:32:21

我的收藏

操作场景

本文以调用 Python SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

步骤1:准备环境

Rocketmq-client Python 基于 rocketmq-client-cpp 进行包装,因此需要先安装 librocketmq
说明:
目前Python客户端仅支持 Linux 和 macOS 操作系统,暂不支持 Windows 系统。
1. 安装 librocketmq (版本2.0.0及以上), 安装教程参见 librocketmq 安装
2. 执行如下命令安装 rocketmq-client-python。
pip install rocketmq-client-python


步骤2:生产消息

创建并编译运行生产消息程序。
from rocketmq.client import Producer, Message

# 初始化生产者,并设置生产组信息,group1
producer = Producer(groupName)
# 设置服务地址
producer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
producer.set_session_credentials(
accessKey, # 角色密钥
secretKey, # 角色名称
''
)
# 启动生产者
producer.start()

# 组装消息 topic名称,在控制台 topic 页面复制。
msg = Message(topicName)
# 设置keys,用户自定义,如果没用,就是空。
KEYS = "YourKey"
msg.set_keys(KEYS)
# 设置tags,用户自定义,如果没用,就是空。
TAGS = "yourTag"
msg.set_tags(TAGS)
# 消息内容
msg.set_body('This is a new message.')

# 发送同步消息
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
# 资源释放
producer.shutdown()
参数
说明
groupName
生产者组名称。在控制台集群管理中 Group tab 中获取。
nameserver
集群接入地址,在集群基本信息中,根据使用需求,使用不同的内网/公网接入地址



secretKey
角色名称,在 集群管理 页面复制 accessSecret 复制。
accessKey
角色密钥,在 集群管理 页面复制 accessKey 复制。



topicName
Topic 的名称,在控制台 Topic 页面复制。
TAGS
用来设置消息的 TAG。
KEYS
设置消息业务 key。
当前开源社区的 Python 客户端生产消息存在一定缺陷,导致同个 Topic 的不同队列间负载不均,详情可参见 缺陷详情

步骤3:消费消息

创建并编译运行消费消息程序。
import time

from rocketmq.client import PushConsumer, ConsumeStatus


# 消息处理回调
def callback(msg):
# 模拟业务
print('Received message. messageId: ', msg.id, ' body: ', msg.body)
# 消费成功回复CONSUME_SUCCESS
return ConsumeStatus.CONSUME_SUCCESS
# 消费成功回复消息状态
# return ConsumeStatus.RECONSUME_LATER


# 初始化消费者,并设置消费者组信息
consumer = PushConsumer(groupName)
# 设置服务地址
consumer.set_name_server_address(nameserver)
# 设置权限(角色名和密钥)
consumer.set_session_credentials(
accessKey, # 角色密钥
secretKey, # 角色名称
''
)
# 订阅topic,默认*订阅,根据需求修改
TAGS = "*"
consumer.subscribe(topicName, callback, TAGS)
print(' [Consumer] Waiting for messages.')
# 启动消费者
consumer.start()

while True:
time.sleep(3600)
# 资源释放
consumer.shutdown()
参数
说明
groupName
消费者 Group 的名称,在控制台 Group 页面复制。
nameserver
同生产地址。
secretKey
同生产消息的获取方式。
accessKey
同生产消息的获取方式。
topicName
Topic 的名称,在控制台 Topic 页面复制。
TAGS
设置订阅消息的tag,默认为"*",表示订阅所有消息。

步骤4:查看消费详情

发送完成消息后会得到一个消息ID (messageID),开发者可以在 “消息查询” 页面查询刚刚发送的消息,如下图所示;并且可以查看特定消息的详情和轨迹等信息,详情请参见 消息查询



说明
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 DemoRocketMQ-Client-Python示例