只有在安装和配置软件后,才能使用RabbitMQ发送和接收消息,安装教程可以参考CentOS安装RabbitMQ的教程。
本文中的所有示例都是使用Python语言提供的,该语言使用处理AMQP消息传递协议的puka库进行备份。但由于AMQP是一种广泛采用的协议,因此任何其他编程语言都可以实现类似的目标。
可以使用 Python pip
包管理器快速安装puka。
pip install puka
pip并不总是与Linux发行版捆绑在一起。在基于Debian的发行版(包括Ubuntu)上,可以使用以下命令轻松安装:
apt-get install python-pip
基于RHEL,如CentOS:
yum install python-setuptools
easy_install pip
消息传递特指RabbitMQ介绍了一些描述消息代理及其机制的基本原理的术语。
本文将使用上述五个术语。还有一个与puka python库严格相关的库,其被作为首选库。这可以理解为对AMQP服务器的同步请求,可以保证请求的执行(无论是否成功)以及决定在完成请求之前所等待的客户端。
虽然puka可以异步工作,但在我们的示例中,puka将用作同步库。这意味着在每次请求(承诺)之后,puka将持续等待直到下一步执行前。关于RabbitMQ的更多基本概念详情请参考腾讯云+社区。
要测试消息代理和puka是否工作正常,并掌握发送和接收消息在实践中的工作方式,请创建一个名为的示例python脚本 rabbit_test.py
vim rabbit_test.py
粘贴脚本内容:
import puka
# declare send and receive clients, both connecting to the same server on local machine
producer = puka.Client("amqp://localhost/")
consumer = puka.Client("amqp://localhost/")
# connect sending party
send_promise = producer.connect()
producer.wait(send_promise)
# connect receiving party
receive_promise = consumer.connect()
consumer.wait(receive_promise)
# declare queue (queue must exist before it is being used - otherwise messages sent to that queue will be discarded)
send_promise = producer.queue_declare(queue='rabbit')
producer.wait(send_promise)
# send message to the queue named rabbit
send_promise = producer.basic_publish(exchange='', routing_key='rabbit', body='Droplet test!')
producer.wait(send_promise)
print "Message sent!"
# start waiting for messages, also those sent before (!), on the queue named rabbit
receive_promise = consumer.basic_consume(queue='rabbit', no_ack=True)
print "Starting receiving!"
while True:
received_message = consumer.wait(receive_promise)
print "GOT: %r" % (received_message['body'],)
break
按:wq保存文件并退出。
运行脚本会打印脚本并发送到RabbitMQ队列,测试程序会在之后立即收到消息。
输出应如下所示:
root@rabbitmq:~# python rabbit_test.py
Message sent!
Starting receiving!
GOT: 'Droplet test!'
root@rabbitmq:~#
让我们来看一下此代码中发生的情况:
localhost
的同一个RabbitMQ服务器上在前面的示例中,无名exchange将消息传递到名为“rabbit”的特定队列。无名exchange需要队列名称才能工作,这意味着它只能将消息传递给单个队列。
RabbitMQ中还有其他类型的交换,其中一个是fanout,这是我们在本文中的主要关注点。fanout交换是一种简单的blind工具,可以将消息传递给它所知道的所有队列。通过fanout交换,不需要提供特定的队列名称。在生成消息之前,将发送到该类交换的消息传递到绑定到交换的所有队列。可以连接到交换机的队列数量没有限制。
通过fanout交换,我们可以轻松创建发布/订阅模式。生产者定期向他们可能不知道的用户发送消息(制作消息并将其发送到fanout exchange)。新订阅者订阅业务通讯(将自己的队列绑定到同一个简报fanout),从业务通讯fanout交换将向所有注册用户(队列)发送消息。
虽然一对一的消息传递非常简单,开发人员经常使用其他通信手段,一对多(其中“多”是不明确的,可以之间的任何数和批次)是一种非常流行的方案,其中的消息代理可以提供巨大的帮助。
生产者应用程序的唯一作用是创建一个fanout exchange,并为该交换产生周期性消息(每隔几秒)。其将自动生成消息。此应用程序将充当业务通讯。
创建一个名为newsletter_produce.py
的python脚本
vim newsletter_produce.py
并粘贴脚本内容:
import puka
import datetime
import time
# declare and connect a producer
producer = puka.Client("amqp://localhost/")
connect_promise = producer.connect()
producer.wait(connect_promise)
# create a fanout exchange
exchange_promise = producer.exchange_declare(exchange='newsletter', type='fanout')
producer.wait(exchange_promise)
# send current time in a loop
while True:
message = "%s" % datetime.datetime.now()
message_promise = producer.basic_publish(exchange='newsletter', routing_key='', body=message)
producer.wait(message_promise)
print "SENT: %s" % message
time.sleep(1)
producer.close()
让我们一步一步地用例子来解释代码中发生的事情。
newsletter
交换所生成具有当前时间的消息。请注意,它的routing_key
是空的,这意味着没有指定特定的队列。交换机将进一步向正确的队列传递消息。该应用程序在运行时会将当前时间通知所有的业务订阅者。
接收者应用程序将创建一个临时队列并将其绑定到命名的fanout交换。之后,它将开始等待消息。在将队列绑定到交换机之后,由此消费者接收由之前创建的生产者发送的每条消息。此应用程序将充当订阅者- 可以一次多次运行应用程序,但仍然所有实例都将接收广播消息。
创建一个名为newsletter_consume.py的python脚本
vim newsletter_consume.py
并粘贴脚本内容:
import puka
# declare and connect a consumer
consumer = puka.Client("amqp://localhost/")
connect_promise = consumer.connect()
consumer.wait(connect_promise)
# create temporary queue
queue_promise = consumer.queue_declare(exclusive=True)
queue = consumer.wait(queue_promise)['queue']
# bind the queue to newsletter exchange
bind_promise = consumer.queue_bind(exchange='newsletter', queue=queue)
consumer.wait(bind_promise)
# start waiting for messages on the queue created beforehand and print them out
message_promise = consumer.basic_consume(queue=queue, no_ack=True)
while True:
message = consumer.wait(message_promise)
print "GOT: %r" % message['body']
consumer.close()
接收者代码比生产者代码复杂一点。让我们一步一步地看一下:
newsletter
exchange。从那一刻起,fanout exchange将把每条消息传递到该队列。该应用程序在运行时会从业务通讯处收到时间通知。它可以一次执行多次,此应用程序的每个实例都将获得当前时间。
要测试业务通讯及其使用者,请打开与虚拟服务器的多个SSH会话(如果在本地计算机上工作,打开多个终端窗口)。
在其中一个窗口中运行生产者应用程序。
root@rabbitmq:~# python newsletter_produce.py
它将开始显示当前时间:
SENT: 2014-02-11 17:24:47.309000
SENT: 2014-02-11 17:24:48.310000
SENT: 2014-02-11 17:24:49.312000
SENT: 2014-02-11 17:24:50.316000
...
在每个其他窗口中运行接收者应用程序:
root@rabbitmq:~# python newsletter_consume.py
此应用程序的每个实例都将收到生产者广播的时间通知:
GOT: 2014-02-11 17:24:47.309000
GOT: 2014-02-11 17:24:48.310000
GOT: 2014-02-11 17:24:49.312000
GOT: 2014-02-11 17:24:50.316000
...
这意味着RabbitMQ正确注册了fanout交换,将订户队列绑定到此交换,并将发送的消息传递到正确的队列。换句话说,RabbitMQ正在按预期工作。
发布/订阅是一种简单的(在概念上和实现中)消息传递模式,通常可以派上用场; 但RabbitMQ可以做到更多。有许多方法可以使用RabbitMQ来解决消息传递问题,包括高级消息路由,消息确认,安全性或持久性。
本文的主要目标是使用简单的示例介绍基本的消息传递概念
参考文献:《How To Use RabbitMQ and Python's Puka to Deliver Messages to Multiple Consumers》
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。