RabbitMQ实战4.发布与订阅交换机临时队列发布与订阅功能实现执行结果流程总结参考文档

继上篇 RabbitMQ实战3.公平调度

RabbitMQ并非直接将消息投递到队列中,而是要经过交换机,交换机再与队列绑定。那么,什么是交换机? 如何通过交换机与队列的绑定实现发布与订阅功能?

交换机

先来了解前面课程中用到的交换机

channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message)

代码中的exchange参数就是指交换机,为空表示默认交换机或者匿名交换机,这种交换机有个特点,即routing_key路由指定的是队列名称。这句代码可以理解为:RabbitMQ经由默认的交换机将消息投递到task_queue队列中。

交换机还有以下几种类型:

  • 直连交换机(direct)
  • 主题交换机(topic)
  • 头交换机 (headers)
  • 扇型交换机(fanout)

显示交换机列表

☁  rabbitMq [master] ⚡ rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.direct  direct
amq.rabbitmq.trace  topic
amq.topic   topic
amq.match   headers
amq.headers headers
amq.fanout  fanout

本篇主要阐述扇形交换机

临时队列

之前的实战文章中,生产者和消费者都是共用同一个具名的队列。本篇要实现的是发布与订阅功能,即生产者发布消息后,不同消费者是从不同的队列中获取消息。这就不可能在生产者中指定具名的队列名称,因为不可能预先知道有多少个队列。这种情况就需要用临时队列。

临时队列:当我们连接上RabbitMQ的时候,我们需要一个全新的、空的队列。我们可以手动创建一个随机的队列名,或者让服务器为我们选择一个随机的队列名(推荐)。我们只需要在调用queue_declare方法的时候,不提供queue参数就可以了:

result = channel.queue_declare() # 创建了一个临时队列

通过result.method.queue获得已经生成的随机队列名

当与消费者(consumer)断开连接的时候,这个队列应当被立即删除。exclusive标识符即可达到此目的。

result = channel.queue_declare(exclusive=True)

生产者投递消息时不需要指定队列,只需要指定类型为扇形的交换机,扇形交换机会将消息推送到所有的队列中

发布与订阅功能实现

交换机

新建 emit_log.py

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# 声明交换机
channel.exchange_declare(exchange='logs',  # 交换机名称
                         exchange_type='fanout')  # 交换机类型

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',  # 指定交换机
                      routing_key='',  # 指定队列,会被fanout交换机覆盖,因此置为空
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

新建 receive_logs.py

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

result = channel.queue_declare(exclusive=True)  # 声明临时队列
queue_name = result.method.queue  # 获取队列名称

# 交换机与队列绑定
channel.queue_bind(exchange='logs',  # 声明要绑定的交换机
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] %r" % body)


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

执行结果

消费者1:直接将结果显示在终端

☁  rabbitMq [master] ⚡ python receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'

消费者2:将结果写到文件

☁  rabbitMq [master] ⚡ python receive_logs.py > logs_from_rabbit.log

发送消息

☁  rabbitMq [master] ⚡ python emit_log.py
 [x] Sent 'info: Hello World!'

查看队列的绑定信息

☁  rabbitMq [master] ⚡ rabbitmqctl list_bindings | grep logs
logs    exchange    amq.gen-SOqwANvv-M0ACC1lWV_MpA  queue   amq.gen-SOqwANvv-M0ACC1lWV_MpA  []
logs    exchange    amq.gen-eAjX2hrcOsurdyoRYJvmow  queue   amq.gen-eAjX2hrcOsurdyoRYJvmow  []

查看日志文件

☁  rabbitMq [master] ⚡ cat logs_from_rabbit.log
 [*] Waiting for logs. To exit press CTRL+C
 [x] b'info: Hello World!'

流程总结

  1. 定义扇形交换机,发送到所有队列
  2. 消费者声明临时队列,将队列与交换机绑定,生产者发送消息,交换机将消息发送到所有绑定的队列
  3. 不同队列有不同的消费者,每个消费者对应不同的队列

参考文档

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Ken的杂谈

【系统设置】CentOS 修改机器名

17630
来自专栏非著名程序员

这是对付产品经理的一副毒药,程序员慎入

程序员和产品经理的日常就像是一对天生的冤家,为了需求的实现,几乎天天在争吵。这不,就在昨天各大技术和产品群里一个程序员暴打产品经理的视频火了,被广泛传播。

11920
来自专栏web前端教室

你可以从面试中学到什么?

讲一下我对面试的一些。。。“偏见”,哈哈,熟悉我的同学们一定要批判的读接下来的内容哈。

11900
来自专栏腾讯大讲堂的专栏

白底黑字or黑底白字,眼睛更喜欢哪一个?

12010
来自专栏haifeiWu与他朋友们的专栏

复杂业务下向Mysql导入30万条数据代码优化的踩坑记录

从毕业到现在第一次接触到超过30万条数据导入MySQL的场景(有点low),就是在顺丰公司接入我司EMM产品时需要将AD中的员工数据导入MySQL中,因此楼主负...

27240
来自专栏FSociety

SQL中GROUP BY用法示例

GROUP BY我们可以先从字面上来理解,GROUP表示分组,BY后面写字段名,就表示根据哪个字段进行分组,如果有用Excel比较多的话,GROUP BY比较类...

5.1K20
来自专栏微信公众号:小白课代表

不只是软件,在线也可以免费下载百度文库了。

不管是学生,还是职场员工,下载各种文档几乎是不可避免的,各种XXX.docx,XXX.pptx更是家常便饭,人们最常用的就是百度文库,豆丁文库,道客巴巴这些下载...

43530
来自专栏非著名程序员

「我真的没有改需求」

11710
来自专栏腾讯NEXT学位

今天我就说三句话

11320
来自专栏前端桃园

知识体系解决迷茫的你

最近在星球里群里都有小伙伴说道自己对未来的路比较迷茫,一旦闲下来就不知道自己改干啥,今天我这篇文章就是让你觉得一天给你 25 个小时你都不够用,觉得睡觉都是浪费...

20040

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励