文档中心 消息队列 CMQ 快速入门 主题模型快速入门

主题模型快速入门

最近更新时间:2019-03-28 18:43:07

主题发布消息有一个前提,即需要有订阅者订阅主题,如果没有订阅者存在,那么主题中的消息不会被投递,此时发布消息这一操作就失去了意义。

1. 创建主题

    endpoint='' //CMQ 的域名
    secretId ='' // 用户的 ID 和 key
    secretKey = ''
    account = Account(endpoint,secretId,secretKey)
    topicName = 'TopicTest8B'
    my_topic = account.get_topic(topicName)
    topic_meta = TopicMeta()
    my_topic.create(topic_meta)

您可以从控制台查看已创建的主题。其中 QPS = 5000,表示调用同一个接口的频率上限,默认为5000次/s。如果您需要提高 QPS 上限,可以通过 提交工单 反馈给我们。

2. 发布消息

您可以通过 SDK 或控制台两种方式发布消息。

使用 SDK 发送消息

    message = Message()
    message.msgBody = "this is a test message"
    my_topic.publish_message(message)

使用控制台发布消息

Topic 目前支持消息过滤,即消息标签、消息类型,用来区分某个 CMQ 的 Topic 下的消息分类。MQ 允许消费者按照标签对消息进行过滤,确保消费者最终只消费到他关心的消息类型。该功能默认不开启,未开启时,所有消息向所有订阅者发送;增加标签后,订阅者将仅能收到带该标签的信息。消息过滤标签描述了该订阅中消息过滤的标签(标签一致的消息才会被推送)。单个标签不超过16个字符,单个 Message 可最多添加5个标签。
Topic 目前支持标签过滤和 routingKey 过滤两种方式。过滤规则如下图所示。

批量发布消息

    vmsg = []
    for i in range(6):
    message = Message()
    message.msgBody = "this is a test message"
    vmsg.append(message)

    my_topic.batch_publish_message(vmsg)

3. 消息处理

主题发布消息之后,会自动将消息推送给订阅,当推送失败时,有两种重试策略:

  • 退避重试:重试3次,间隔时间为10 - 20s之间的一个随机值,超过3次后,该条消息对于该订阅者丢弃,不会再重试。
  • 衰退指数重试:重试176次,总计重试时间为1天,间隔时间依次为:2^0 , 2^1, …, 512, 512, …, 512秒。默认为衰退指数重试策略。

使用队列处理消息

订阅者可以填写一个 Queue,使用队列来接收发布的消息。

    subscription_name = "subsc-test"
    my_sub = my_account.get_subscription(topic_name, subscription_name)
    subscription_meta = SubscriptionMeta()
    # 填写订阅名称,这里填写队列名称
    subscription_meta.Endpoint = "queue name  "
    subscription_meta.Protocal = "queue"
    my_sub.create(subscription_meta)

使用其他方式处理消息

订阅者也可以不与 Queue 结合,自己来处理消息。详情请参考 投递消息

4. 路由匹配

Binding key 、Routing key 是组合使用的,兼容 rabbitmq topic 匹配模式。发消息时配的 Routing key 是客户端发消息带的, 必须为字符串,不能有匹配符。创建订阅关系时配的 Binding key 是 topic 和订阅者的绑定关系。

使用限制:

  • Binding key 的数量不超过5个。单个 binding key 的长度 ≤ 64字节,用于表示发送消息的路由路径,最多含有15个“.”,即最多16个词组。
  • Routing key 的数量由1个字符串组成。单个 Routing key 的长度 ≤ 64字节,用于表示发送消息的路由路径,最多含有15个“.”,即最多16个词组。

通配符说明:

  • *(星号):可以替代一个单词(一串连续的字母串)。
  • #(井号):可以匹配一个或多个字符。
  • rabbitmq的特例:routing_key为空字符串时,不匹配 * ,但可以匹配 #。

示例:

  • 订阅者是『1.*.0』,此时消息为『1.任意字符.0』,则订阅者都能收到消息。
  • 订阅者是『1.#.0』,此时消息为『1.2.3.4.4.2.2.0』,则订阅者都能收到消息(消息中间元素随意) 。

使用路由匹配功能

    endpoint='' //CMQ 的域名
    secretId ='' // 用户的 ID 和 key
    secretKey = ''
    account = Account(endpoint,secretId,secretKey)
    topicName = 'TopicTest'
    my_topic = account.get_topic(topicName)
    topic_meta = TopicMeta()
    topic_meta.filterType = =2 //表示消息投递给订阅的时候采用路由匹配。
    //若filterType =1 则表示使用标签过滤。
    my_topic.create(topic_meta)

    subscription_name = "subsc-test"
    my_sub = my_account.get_subscription(topic_name, subscription_name)
    subscription_meta = SubscriptionMeta()
    //填写订阅名称,这里填写队列名称
    subscription_meta.Endpoint = "queue name  "
    subscription_meta.Protocal = "queue"
    subscription_meta.bindingKey=[1.*.0]  //若消息的标签为[1.任意字符.0]则该订阅者都
    //能收到该标签
    my_sub.create(subscription_meta)

发布消息

    message = Message()
    message.msgBody = "this is a test message"
    routingKey = '1.test.0' //该消息会被投递到 my_sub订阅的地址。
    my_topic.publish_message(message)