Python SDK

最近更新时间:2024-12-17 17:42:22

我的收藏

操作步骤

本文以 Python SDK 为例介绍客户端接入 TDMQ CMQ 版服务并收发消息的操作步骤。

前提条件

队列模型

操作步骤

1. 在控制台创建符合需求的队列,参见 创建队列服务
说明
创建消息队列可在控制台手动创建,或通过云 API 进行创建,使用云 API 需要安装相关 SDK,SDK 安装可参见 Python SDK 安装
shell
python
pip install --upgrade tencentcloud-sdk-python
# api认证信息
cred = credential.Credential(SecretId, SecretKey)
httpProfile = HttpProfile()
httpProfile.endpoint = NameServerAddress

clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
# 创建tdmq客户端
client = tdmq_client.TdmqClient(cred, "ap-guangzhou", clientProfile)

# 创建cmq队列请求参数
req = models.CreateCmqQueueRequest()
params = {
"QueueName": "queue_api",
# 下面是死信队列相关配置
"DeadLetterQueueName": "dead_queue_api", # 死信队列,该消息队列要先创建
"Policy": 0, # 0为消息被多次消费未删除,1为Time-To-Live过期
"MaxReceiveCount": 3 # 最大接收次数 1-1000
}
req.from_json_string(json.dumps(params))

# 创建cmq消息队列
resp = client.CreateCmqQueue(req)

参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API请求地址处复制。
img


SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API密钥管理页面复制。
img


2. 在项目中引入 CMQ 相关文件,需要根据使用的 Python 版本选择分支,默认为 Python2 SDK,您可切换至 Python3 分支中查看 Python3 SDK。
3. 发送消息。
import os
import sys

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/..")

import logging
from cmq.account import Account
from cmq.queue import Message
from cmq.cmq_exception import CMQExceptionBase

# 腾讯云账户 secretId、secretKey, 此处还需注意密钥对的保密
# 密钥可前往https://console.cloud.tencent.com/cam/capi网站进行获取
secretId = 'AKIDSiiRtxxxx'
secretKey = 'GGzSeaM5xxxx'
# CMQ的服务调用地址
nameServerAddress = 'https://cmq-gz.public.tencenttdmq.com'

# 初始化 my_account, my_queue
# Account类对象不是线程安全的,如果多线程使用,需要每个线程单独初始化Account类对象
my_account = Account(nameServerAddress, secretId, secretKey, debug=True)
my_account.set_log_level(logging.DEBUG)
# 消息队列名称
queue_name = sys.argv[1] if len(sys.argv) > 1 else "python_queue"
my_queue = my_account.get_queue(queue_name)

try:
# 消息内容
msg_body = "I am test message."
msg = Message(msg_body)
# 发送消息
re_msg = my_queue.send_message(msg)
# 发送结果
print("Send Message Succeed! MessageBody:%s MessageID:%s" % (msg_body, re_msg.msgId))
except CMQExceptionBase as e:
print("Send Message Fail! Exception:%s\\n" % e)
参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API请求地址处复制。
img


SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API密钥管理页面复制。
img


queue_name
队列名称,在 TDMQ CMQ 版控制台 的队列服务列表页面获取。
4. 消费消息。
import os
import sys

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/..")

import logging
from cmq.account import Account
from cmq.cmq_exception import CMQExceptionBase

# 腾讯云账户 secretId、secretKey, 此处还需注意密钥对的保密
# 密钥可前往https://console.cloud.tencent.com/cam/capi网站进行获取
secretId = 'AKIDSiiRtxxxx'
secretKey = 'GGzSeaM5xxxx'
# CMQ的服务调用地址
nameServerAddress = 'https://cmq-gz.public.tencenttdmq.com'

# 初始化 my_account, my_queue
# Account类对象不是线程安全的,如果多线程使用,需要每个线程单独初始化Account类对象
my_account = Account(nameServerAddress, secretId, secretKey, debug=True)
my_account.set_log_level(logging.DEBUG)
queue_name = sys.argv[1] if len(sys.argv) > 1 else "python_queue"
my_queue = my_account.get_queue(queue_name)

try:
wait_seconds = 3
# 获取消息
recv_msg = my_queue.receive_message(wait_seconds)
# 具体业务
print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (
recv_msg.receiptHandle, recv_msg.msgBody, recv_msg.msgId))
# 消费成功,删除消息
my_queue.delete_message(recv_msg.receiptHandle)
except CMQExceptionBase as e:
print("Receive Message Fail! Exception:%s\\n" % e)
参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。
img


SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。
img


queue
队列名称,在 TDMQ CMQ 版控制台 的队列服务列表页面获取。


主题模型

操作步骤

1. 准备所需资源,创建主题订阅和订阅者。
1.1 创建主题订阅。可通过控制台手动创建,也可以通过云 API 进行创建,使用云 API 需要安装相关 SDK,SDK 安装可参见 Python SDK 安装
# api认证信息
cred = credential.Credential(SecretId, SecretKey)
httpProfile = HttpProfile()
httpProfile.endpoint = NameServerAddress

clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = tdmq_client.TdmqClient(cred, "ap-guangzhou", clientProfile)

req = models.CreateCmqTopicRequest()
params = {
"TopicName": "topic_api", # 主题名字,在单个地域同一帐号下唯一
"FilterType": 1, # 用于指定主题的消息匹配策略。1:表示标签匹配策略;2:表示路由匹配策略
"MsgRetentionSeconds": 86400 # 消息保存时间。取值范围60 - 86400 s(即1分钟 - 1天)
}
req.from_json_string(json.dumps(params))

# 创建topic
resp = client.CreateCmqTopic(req)
参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。
img


SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。
img


1.2 创建订阅者。可通过控制台进行手动创建,也可以通过云 API 进行创建,使用云 API 需要安装相关 SDK,SDK 安装可参见 Python SDK 安装
# api认证信息
cred = credential.Credential(SecretId, SecretKey)
httpProfile = HttpProfile()
httpProfile.endpoint = NameServerAddress

clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = tdmq_client.TdmqClient(cred, "ap-guangzhou", clientProfile)

req = models.CreateCmqSubscribeRequest()
params = {
"TopicName": "topic_api", # 创建订阅的topic名称
"SubscriptionName": "sub", # 订阅名称
"Protocol": "queue", # 订阅的协议,目前支持两种协议:http、queue。使用http协议,用户需自己搭建接受消息的web server。使用queue,消息会自动推送到CMQ queue,用户可以并发地拉取消息。
"Endpoint": "topic_queue_api", # 接收通知的Endpoint,根据协议Protocol区分:对于http,Endpoint必须以“http://”开头,host可以是域名或IP;对于Queue,则填QueueName。
"NotifyStrategy": "BACKOFF_RETRY", # CMQ推送服务器的重试策略。取值有:1)BACKOFF_RETRY,退避重试。;2)EXPONENTIAL_DECAY_RETRY,指数衰退重试。
"FilterTag": ["TAG"], # 消息标签(用于消息过滤)。标签数量不能超过5个
# "BindingKey": ["a.b.c"], # BindingKey数量不超过5个, 每个BindingKey长度不超过64字节,该字段表示订阅接收消息的过滤策略
"NotifyContentFormat": "SIMPLIFIED" # 推送内容的格式。取值:1)JSON;2)SIMPLIFIED,即raw格式。如果Protocol是queue,则取值必须为SIMPLIFIED。如果Protocol是http,两个值均可以,默认值是JSON。
}
req.from_json_string(json.dumps(params))

# 创建订阅
resp = client.CreateCmqSubscribe(req)
注意
BindingKey 与 FilterTag 要根据所订阅topic类型进行设置,否则无效。
参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。
img


SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。
img


2. 在项目中引入 CMQ 相关文件,需要根据使用的 Python 版本选择分支,默认为 Python2 SDK,您可切换至 Python3 分支中查看 Python3 SDK。
3. 创建 my_topic,用来发布消息。
import os
import sys

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/..")

import logging
from cmq.account import Account
from cmq.cmq_exception import *
from cmq.topic import *

# 腾讯云账户 secretId、secretKey, 此处还需注意密钥对的保密
# 密钥可前往https://console.cloud.tencent.com/cam/capi网站进行获取
secretId = 'AKIDSiiRtxxxx'
secretKey = 'GGzSeaM5xxxx'
# CMQ的服务调用地址
nameServerAddress = 'https://cmq-gz.public.tencenttdmq.com'

try:
# 初始化 my_account
# Account类对象不是线程安全的,如果多线程使用,需要每个线程单独初始化Account类对象
my_account = Account(nameServerAddress, secretId, secretKey, debug=True)
my_account.set_log_level(logging.DEBUG)
# topic主题名称
topic_name = sys.argv[1] if len(sys.argv) > 1 else "python_topic_route"
my_topic = my_account.get_topic(topic_name)
except CMQExceptionBase as e:
print("Exception:%s\\n" % e)
参数
说明
NameServerAddress
API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。
img


SecretId、SecretKey
云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。
img


topic_name
主题订阅名称,在 TDMQ CMQ 版控制台 的主题订阅列表页面获取。
4. 发送 TAG 类型消息。
# 消息tag
tags = ["TAG", "TAG1", "TAG2"]
for tag in tags:
# 发送tag消息
message = Message("this is a test TAG message. TAG:" + tag, [tag])
re_msg = my_topic.publish_message(message)
# 发送结果
print("Send Message Succeed! MessageBody:%s MessageID:%s" % (message.msgBody, re_msg.msgId))
5. 发送 route 消息。
# 消息route信息
routes = ["a.b.c", "a.b.x", "a.c.d", "x.y.z", "x.y.c"]
for route in routes:
message = Message("this is a test route message. Route:" + route)
# 发送route消息
re_msg = my_topic.publish_message(message, route)
# 发送结果
print("Send Message Succeed! MessageBody:%s MessageID:%s" % (message.msgBody, re_msg.msgId))
6. 消费者消费订阅者订阅的消息队列即可。
说明
以上是 CMQ 两种模型下的生产和消费方式的简单介绍,更多使用可参见 DemoCMQ 代码仓库