文档中心>消息队列 CMQ>快速入门>主题模型快速入门

主题模型快速入门

最近更新时间:2021-08-04 15:17:52

我的收藏
主题发布消息有一个前提,即需要有订阅者订阅主题,如果没有订阅者存在,那么主题中的消息不会被投递,此时发布消息这一操作就失去了意义。

1. 创建主题

endpoint='' //CMQ 的域名
secretId ='' // 用户的 ID 和 key
secretKey = ''
account = Account(endpoint,secretId,secretKey)
topicName = 'TopicTest8B'
my_topic = account.get_topic(topicName)
topic_meta = TopicMeta()
my_topic.create(topic_meta)
您可以从控制台查看已创建的主题。其中 QPS = 5000,表示调用同一个接口的频率上限,默认为5000次/s。如果您需要提高 QPS 上限,可以通过 提交工单 反馈给我们。




2. 发布消息

您可以通过 SDK 或控制台两种方式发布消息。

使用 SDK 发送消息

message = Message()
message.msgBody = "this is a test message"
my_topic.publish_message(message)

使用控制台发布消息

Topic 目前支持消息过滤,即消息标签、消息类型,用来区分某个 CMQ 的 Topic 下的消息分类。MQ 允许消费者按照标签对消息进行过滤,确保消费者最终只消费到他关心的消息类型。该功能默认不开启,未开启时,所有消息向所有订阅者发送;增加标签后,订阅者将仅能收到带该标签的信息。消息过滤标签描述了该订阅中消息过滤的标签(标签一致的消息才会被推送)。单个标签不超过16个字符,单个 Message 可最多添加5个标签。 Topic 目前支持标签过滤和 routingKey 过滤两种方式。过滤规则如下图所示:



批量发布消息

vmsg = []
for i in range(6):
message = Message()
message.msgBody = "this is a test message"
vmsg.append(message)

my_topic.batch_publish_message(vmsg)

3. 消息处理

主题发布消息之后,会自动将消息推送给订阅,当推送失败时,有两种重试策略:
退避重试:重试3次,间隔时间为10 - 20s之间的一个随机值,超过3次后,该条消息对于该订阅者丢弃,不会再重试。
衰退指数重试:重试176次,总计重试时间为1天,间隔时间依次为:2^0 , 2^1, …, 512, 512, …, 512秒。默认为衰退指数重试策略。

使用队列处理消息

订阅者可以填写一个 Queue,使用队列来接收发布的消息。
subscription_name = "subsc-test"
my_sub = my_account.get_subscription(topic_name, subscription_name)
subscription_meta = SubscriptionMeta()
# 填写订阅名称,这里填写队列名称
subscription_meta.Endpoint = "queue name "
subscription_meta.Protocal = "queue"
my_sub.create(subscription_meta)

使用其他方式处理消息

订阅者也可以不与 Queue 结合,自己来处理消息。详情请参考 投递消息

4. 路由匹配

Binding key 、Routing key 是组合使用的,兼容 rabbitmq topic 匹配模式。发消息时配的 Routing key 是客户端发消息带的, 必须为字符串,不能有匹配符。创建订阅关系时配的 Binding key 是 topic 和订阅者的绑定关系。
使用限制:
Binding key 的数量不超过5个。单个 binding key 的长度 ≤ 64字节,用于表示发送消息的路由路径,最多含有15个“.”,即最多16个词组。
Routing key 的数量由1个字符串组成。单个 Routing key 的长度 ≤ 64字节,用于表示发送消息的路由路径,最多含有15个“.”,即最多16个词组。
通配符说明:
*(星号):可以替代一个单词(一串连续的字母串)。
#(井号):可以匹配一个或多个字符。
rabbitmq的特例:routing_key为空字符串时,不匹配 * ,但可以匹配 #。
示例:
订阅者是『1.*.0』,此时消息为『1.任意字符.0』,则订阅者都能收到消息。
订阅者是『1.#.0』,此时消息为『1.2.3.4.4.2.2.0』,则订阅者都能收到消息(消息中间元素随意) 。

使用路由匹配功能

endpoint='' //CMQ 的域名
secretId ='' // 用户的 ID 和 key
secretKey = ''
account = Account(endpoint,secretId,secretKey)
topicName = 'TopicTest'
my_topic = account.get_topic(topicName)
topic_meta = TopicMeta()
topic_meta.filterType =2 //表示消息投递给订阅的时候采用路由匹配
//若 filterType =1 则表示使用标签过滤
my_topic.create(topic_meta)

subscription_name = "subsc-test"
my_sub = my_account.get_subscription(topic_name, subscription_name)
subscription_meta = SubscriptionMeta()
//填写订阅名称,这里填写队列名称
subscription_meta.Endpoint = "queue name "
subscription_meta.Protocal = "queue"
subscription_meta.bindingKey=[1.*.0] //若消息的标签为[1.任意字符.0],则该订阅者都能收到该标签
my_sub.create(subscription_meta)

发布消息

message = Message()
message.msgBody = "this is a test message"
message.routingKey = '1.test.0' //该消息会被投递到 my_sub 订阅的地址
my_topic.publish_message(message)