前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rabbitmq 实现分发消息队列与话题消息队列

rabbitmq 实现分发消息队列与话题消息队列

作者头像
用户3147702
发布2022-06-27 13:19:43
6060
发布2022-06-27 13:19:43
举报
文章被收录于专栏:小脑斧科技博客

1. 概述

上文中,我们介绍了 rabbitmq 的 exchange 的四种模式和点对点连接、发布订阅队列的实现。 本文中,我们介绍分发队列与主题队列的实现,分别使用 exchange 的 direct 模式和 topic 模式。

2. 分发队列

如上文介绍的,direct 模式下,exchange 收到消息后根据 routing-key 将消息转发到对应的队列,因此,queue 需要 bind 到 exchange 并且提供 routing-key。

2.1. php-amqplib

代码语言:javascript
复制
<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('error_log_queue', false, false, false, false);
$channel->queue_declare('warning_log_queue', false, false, false, false);

$channel->exchange_declare('direct_logs', 'direct', false, false, false);
$channel->queue_bind('error_log_queue', 'direct_logs', 'error');
$channel->queue_bind('warning_log_queue', 'direct_logs', 'info');
$channel->queue_bind('warning_log_queue', 'direct_logs', 'warning');

for ($i = 0; $i < 3; $i++) {
    $msg = new AMQPMessage('Hello World! error');
    $channel->basic_publish($msg, 'direct_logs', 'error');
}
$msg = new AMQPMessage('Hello World! warning');
$channel->basic_publish($msg, 'direct_logs', 'warning');
$msg = new AMQPMessage('Hello World! info');
$channel->basic_publish($msg, 'direct_logs', 'info');

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();
?>

可以看到,error_log_queue 队列中收到了他所关心的 routing-key 为 error 的三条消息,而 warning_log_queue 队列中收到了他所关心的 routing-key 为 info 和 warning 的两条消息。 除了上面展示的一个队列可以设定多个 routing-key,多个不同的队列也可以设置相同的 routing-key,则 exchange 会将符合相应 routing-key 的所有的消息全部都发到所有关心该 routing-key 的队列中。

2.2. AMQP 扩展

下面是相同功能的原生 AMQP 扩展版本的 producer 代码:

代码语言:javascript
复制
<?php
$conn_args = array('host' => 'localhost', 'port' => 5672,
    'login' => 'guest', 'password' => 'guest');
$connection = new AMQPConnection($conn_args);
if ($connection->connect()) {
    echo "Established a connection to the broker \n";
} else {
    echo "Cannot connect to the broker \n ";
}
$channel = new AMQPChannel($connection);
$error_queue = new AMQPQueue($channel);
$warning_queue = new AMQPQueue($channel);

$ex = new AMQPExchange($channel);
$ex->setName('logs_exchange');
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->declareExchange();

$error_queue->setName('error_log_queue');
$warning_queue->setName('warning_log_queue');
$error_queue->declare();
$warning_queue->declare();
$error_queue->bind('logs_exchange', 'error');
$warning_queue->bind('logs_exchange', 'warning');
$warning_queue->bind('logs_exchange', 'info');

for ($i = 0; $i < 3; $i++) {
    $ex->publish('Hello World! error', 'error');
}
$ex->publish('Hello World! error', 'warning');
$ex->publish('Hello World! error', 'info');
echo " [x] Sent 'Hello World!'\n";
?>

3. 话题队列

虽然在实际使用中,topic 队列并没有 direct 队列和发布/订阅消息队列使用的那么多,但是 topic 队列提供了更高的灵活性,在很多场景下可以解决更加复杂的问题,事实上,使用 topic 模式可以在行为上实现其他所有的消息队列模式。 正如我们在之前的日志中所介绍的,所谓的话题,指的就是对 routing-key 的模糊匹配以实现消息的投递。

exchange 对话题的模糊匹配,是以关键词为单位进行的,而不是单个的字符,# 表示 0 个或若干个关键词, 表示一个关键词。 如 log. 可以匹配成功 log.warn 但不能匹配成功 log.warn.timeout,而 #.log.# 可以匹配上述两个,同时,消息的 routing-key 也必须是点分字符串,这样,若干个关键词才能被识别。 他与 direct 模式一样,Queue 必须 bind 到 Exchange 并提供 routing-key。

3.1. php-amqplib

我们使用 php-amqplib 来实际操作一下 topic 队列:

代码语言:javascript
复制
<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('fruts', false, false, false, false);
$channel->queue_declare('lazy_rabbit', false, false, false, false);

$channel->exchange_declare('topicex', 'topic', false, false, false);
$channel->queue_bind('fruts', 'topicex', '*.orange.*');
$channel->queue_bind('lazy_rabbit', 'topicex', '*.*.rabbit');
$channel->queue_bind('lazy_rabbit', 'topicex', 'lazy.#');

for ($i = 0; $i < 3; $i++) {
    $msg = new AMQPMessage('Hello World! busy.orange.rabbit');
    $channel->basic_publish($msg, 'topicex', 'busy.orange.rabbit');
}
$msg = new AMQPMessage('Hello World! lazy.orange.rabbit');
$channel->basic_publish($msg, 'topicex', 'lazy.orange.rabbit');
$msg = new AMQPMessage('Hello World! lazy.apple.rabbit');
$channel->basic_publish($msg, 'topicex', 'lazy.apple.rabbit');

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();
?>

可以看到,除了 lazy.apple.rabbit 没有匹配 fruts 的通配符而只发送到了 lazy_rabbit 队列,其他消息都发送到了两个队列中。

3.2. AMQP 扩展

下面是原生 AMQP 扩展的相应实现:

代码语言:javascript
复制
<?php
$conn_args = array('host' => 'localhost', 'port' => 5672,
    'login' => 'guest', 'password' => 'guest');
$connection = new AMQPConnection($conn_args);
if ($connection->connect()) {
    echo "Established a connection to the broker \n";
} else {
    echo "Cannot connect to the broker \n ";
}
$channel = new AMQPChannel($connection);
$fruts = new AMQPQueue($channel);
$lazy_rabbit = new AMQPQueue($channel);

$ex = new AMQPExchange($channel);
$ex->setName('topicex');
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();

$fruts->setName('fruts');
$lazy_rabbit->setName('lazy_rabbit');
$fruts->declare();
$lazy_rabbit->declare();
$fruts->bind('topicex', '*.orange.*');
$lazy_rabbit->bind('topicex', '*.*.rabbit');
$lazy_rabbit->bind('topicex', 'lazy.#');

for ($i = 0; $i < 3; $i++) {
    $ex->publish('Hello World! busy.orange.rabbit', 'busy.orange.rabbit');
}
$ex->publish('Hello World! lazy.orange.rabbit', 'lazy.orange.rabbit');
$ex->publish('Hello World! lazy.apple.rabbit', 'lazy.apple.rabbit');
echo " [x] Sent 'Hello World!'\n";
?>
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-03-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. 分发队列
    • 2.1. php-amqplib
      • 2.2. AMQP 扩展
      • 3. 话题队列
        • 3.1. php-amqplib
          • 3.2. AMQP 扩展
          相关产品与服务
          消息队列 CMQ 版
          消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档