操作场景
本文以调用 Python SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
操作步骤
1. 准备环境。
在客户端环境安装 pulsar-client 库,可以使用 pip 进行安装,也可以使用其他方式,参见 Pulsar Python client。
pip install 'pulsar-client==3.1.0'
2. 创建客户端。
# 创建客户端client = pulsar.Client(authentication=pulsar.AuthenticationToken(# 已授权角色密钥AUTHENTICATION),# 服务接入地址service_url=SERVICE_URL)
3. 创建生产者。
# 创建生产者producer = client.create_producer(# topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制topic='pulsar-xxx/sdk_python/topic1')
说明
Topic 名称需要填入完整路径,即
persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上 Topic管理 页面直接复制。4. 发送消息。
# 发送消息producer.send(# 消息内容'Hello python client, this is a msg.'.encode('utf-8'),# 消息参数properties={'k': 'v'},# 业务keypartition_key='yourKey')
还可以使用异步方式发送消息。
# 异步发送回调def send_callback(send_result, msg_id):print('Message published: result:{} msg_id:{}'.format(send_result, msg_id))# 发送消息producer.send_async(# 消息内容'Hello python client, this is a async msg.'.encode('utf-8'),# 异步回调callback=send_callback,# 消息配置properties={'k': 'v'},# 业务keypartition_key='yourKey')
5. 创建消费者。
# 订阅消息consumer = client.subscribe(# topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制topic='pulsar-xxx/sdk_python/topic1',# 订阅名称subscription_name='sub_topic1')
说明
Topic 名称需要填入完整路径,即 
persistent://clusterid/namespace/Topic
,clusterid/namespace/topic
的部分可以从控制台上 Topic管理 页面直接复制。

subscriptionName 需要写入订阅名,可在消费管理界面查看。
6. 消费消息。
# 获取消息msg = consumer.receive()try:# 模拟业务print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))# 消费成功,回复ackconsumer.acknowledge(msg)except:# 消费失败,消息将会重新投递consumer.negative_acknowledge(msg)
7. 登录 TDMQ Pulsar 版控制台,依次点击 Topic 管理 > Topic 名称进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。


说明