上文中,我们介绍了 rabbitmq 的 exchange 的四种模式和点对点连接、发布订阅队列的实现。 本文中,我们介绍分发队列与主题队列的实现,分别使用 exchange 的 direct 模式和 topic 模式。
如上文介绍的,direct 模式下,exchange 收到消息后根据 routing-key 将消息转发到对应的队列,因此,queue 需要 bind 到 exchange 并且提供 routing-key。
<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('error_log_queue', false, false, false, false);
$channel->queue_declare('warning_log_queue', false, false, false, false);
$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->queue_bind('error_log_queue', 'direct_logs', 'error');
$channel->queue_bind('warning_log_queue', 'direct_logs', 'info');
$channel->queue_bind('warning_log_queue', 'direct_logs', 'warning');
for ($i = 0; $i < 3; $i++) {
$msg = new AMQPMessage('Hello World! error');
$channel->basic_publish($msg, 'direct_logs', 'error');
}
$msg = new AMQPMessage('Hello World! warning');
$channel->basic_publish($msg, 'direct_logs', 'warning');
$msg = new AMQPMessage('Hello World! info');
$channel->basic_publish($msg, 'direct_logs', 'info');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
?>
可以看到,error_log_queue 队列中收到了他所关心的 routing-key 为 error 的三条消息,而 warning_log_queue 队列中收到了他所关心的 routing-key 为 info 和 warning 的两条消息。 除了上面展示的一个队列可以设定多个 routing-key,多个不同的队列也可以设置相同的 routing-key,则 exchange 会将符合相应 routing-key 的所有的消息全部都发到所有关心该 routing-key 的队列中。
下面是相同功能的原生 AMQP 扩展版本的 producer 代码:
<?php
$conn_args = array('host' => 'localhost', 'port' => 5672,
'login' => 'guest', 'password' => 'guest');
$connection = new AMQPConnection($conn_args);
if ($connection->connect()) {
echo "Established a connection to the broker \n";
} else {
echo "Cannot connect to the broker \n ";
}
$channel = new AMQPChannel($connection);
$error_queue = new AMQPQueue($channel);
$warning_queue = new AMQPQueue($channel);
$ex = new AMQPExchange($channel);
$ex->setName('logs_exchange');
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->declareExchange();
$error_queue->setName('error_log_queue');
$warning_queue->setName('warning_log_queue');
$error_queue->declare();
$warning_queue->declare();
$error_queue->bind('logs_exchange', 'error');
$warning_queue->bind('logs_exchange', 'warning');
$warning_queue->bind('logs_exchange', 'info');
for ($i = 0; $i < 3; $i++) {
$ex->publish('Hello World! error', 'error');
}
$ex->publish('Hello World! error', 'warning');
$ex->publish('Hello World! error', 'info');
echo " [x] Sent 'Hello World!'\n";
?>
虽然在实际使用中,topic 队列并没有 direct 队列和发布/订阅消息队列使用的那么多,但是 topic 队列提供了更高的灵活性,在很多场景下可以解决更加复杂的问题,事实上,使用 topic 模式可以在行为上实现其他所有的消息队列模式。 正如我们在之前的日志中所介绍的,所谓的话题,指的就是对 routing-key 的模糊匹配以实现消息的投递。
exchange 对话题的模糊匹配,是以关键词为单位进行的,而不是单个的字符,# 表示 0 个或若干个关键词, 表示一个关键词。 如 log. 可以匹配成功 log.warn 但不能匹配成功 log.warn.timeout,而 #.log.# 可以匹配上述两个,同时,消息的 routing-key 也必须是点分字符串,这样,若干个关键词才能被识别。 他与 direct 模式一样,Queue 必须 bind 到 Exchange 并提供 routing-key。
我们使用 php-amqplib 来实际操作一下 topic 队列:
<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('fruts', false, false, false, false);
$channel->queue_declare('lazy_rabbit', false, false, false, false);
$channel->exchange_declare('topicex', 'topic', false, false, false);
$channel->queue_bind('fruts', 'topicex', '*.orange.*');
$channel->queue_bind('lazy_rabbit', 'topicex', '*.*.rabbit');
$channel->queue_bind('lazy_rabbit', 'topicex', 'lazy.#');
for ($i = 0; $i < 3; $i++) {
$msg = new AMQPMessage('Hello World! busy.orange.rabbit');
$channel->basic_publish($msg, 'topicex', 'busy.orange.rabbit');
}
$msg = new AMQPMessage('Hello World! lazy.orange.rabbit');
$channel->basic_publish($msg, 'topicex', 'lazy.orange.rabbit');
$msg = new AMQPMessage('Hello World! lazy.apple.rabbit');
$channel->basic_publish($msg, 'topicex', 'lazy.apple.rabbit');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
?>
可以看到,除了 lazy.apple.rabbit 没有匹配 fruts 的通配符而只发送到了 lazy_rabbit 队列,其他消息都发送到了两个队列中。
下面是原生 AMQP 扩展的相应实现:
<?php
$conn_args = array('host' => 'localhost', 'port' => 5672,
'login' => 'guest', 'password' => 'guest');
$connection = new AMQPConnection($conn_args);
if ($connection->connect()) {
echo "Established a connection to the broker \n";
} else {
echo "Cannot connect to the broker \n ";
}
$channel = new AMQPChannel($connection);
$fruts = new AMQPQueue($channel);
$lazy_rabbit = new AMQPQueue($channel);
$ex = new AMQPExchange($channel);
$ex->setName('topicex');
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
$fruts->setName('fruts');
$lazy_rabbit->setName('lazy_rabbit');
$fruts->declare();
$lazy_rabbit->declare();
$fruts->bind('topicex', '*.orange.*');
$lazy_rabbit->bind('topicex', '*.*.rabbit');
$lazy_rabbit->bind('topicex', 'lazy.#');
for ($i = 0; $i < 3; $i++) {
$ex->publish('Hello World! busy.orange.rabbit', 'busy.orange.rabbit');
}
$ex->publish('Hello World! lazy.orange.rabbit', 'lazy.orange.rabbit');
$ex->publish('Hello World! lazy.apple.rabbit', 'lazy.apple.rabbit');
echo " [x] Sent 'Hello World!'\n";
?>