前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rabbitmq exchange 的四种模式

rabbitmq exchange 的四种模式

作者头像
用户3147702
发布2022-06-27 13:20:10
4490
发布2022-06-27 13:20:10
举报
文章被收录于专栏:小脑斧科技博客

1. 概述

在之前的文章中,我们介绍了 AMQP 协议所能实现的各种功能: AMQP 消息服务应用协议

  1. 存储转发(多个消息发送者,单个消息接收者)
  2. 分布式事务(多个消息发送者,多个消息接收者)
  3. 发布订阅(多个消息发送者,多个消息接收者)
  4. 基于内容的路由(多个消息发送者,多个消息接收者)
  5. 文件传输队列(多个消息发送者,多个消息接收者)
  6. 点对点连接(单个消息发送者,单个消息接收者)

本文中,我们就来介绍一下 rabbitmq 的各种用法。 本文以 php 为例,其他语言的用法非常类似。

2. 点对点连接

最基本的模式就是点对点模式,一个生产者向队列中投入消息,一个消费者循环从队列中取数据。

2.1. php-amqplib

  • producer
代码语言:javascript
复制
<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();
?>
  • consumer
代码语言:javascript
复制
<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}
?>

这段代码中,producer 向名为 "hello" 的队列中放入消息 "Hello World",consumer 从其中取出消息,这是消息队列最简单的用法。 basic_consume 方法的第一个参数标识队列名称,第四个参数标识是否自动 ack,第七个参数则是收到消息后执行的回调方法。

2.2. Acknowledge

消息队列使用时,如果 consumer 意外退出,那么他没来得及处理的消息会如何处理呢? AMQP 要求消费者需要向队列发送 ACK 消息表示消息已经处理,否则这条消息还会分发给其他 consumer 去处理,以防止消息的丢失。 如果设置了 auto_ack,则 consumer 在收到消息后会立即自动发送 ACK 消息,这样在代码中无需手动发送 ack 消息,但是方便的同时带来了消息丢失的风险。 下面是手动 ack 的 consumer 改进版本:

代码语言:javascript
复制
<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
    sleep(3);
    echo " [x] Done".PHP_EOL;
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('hello', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}
?>

basic_qos 设置了队列的 prefetch_count 属性,它限制了消费者同时能够接收的消息数,设置为 1 也就意味着,在 consumer 手动发送 ack 前,队列不会再将新的消息发送给他。

这样,我们可以不再仅仅用一个 consumer 来进行消费了,我们可以同时启动多个 consumer 来实现队列消息的消费了。

2.3. PHP AMQP 扩展

下面使用 PHP 官方提供的 AMQP 扩展实现上述功能。

  • producer
代码语言:javascript
复制
<?php
$conn_args = array('host' => 'localhost', 'port' => 5672,
    'login' => 'guest', 'password' => 'guest');
$connection = new AMQPConnection($conn_args);
if ($connection->connect()) {
    echo "Established a connection to the broker \n";
} else {
    echo "Cannot connect to the broker \n ";
}
$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('hello');
$ex = new AMQPExchange($channel);
$ex->setName('helloexchange');
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
$ex->publish('Hello World!', 'hello');
?>
  • consumer
代码语言:javascript
复制
<?php
$conn_args = array('host' => 'localhost', 'port' => 5672,
    'login' => 'guest', 'password' => 'guest');
$connection = new AMQPConnection($conn_args);
if ($connection->connect()) {
    echo "Established a connection to the broker \n";
} else {
    echo "Cannot connect to the broker \n ";
}
$channel = new AMQPChannel($connection);
$queue = new AMQPQueue($channel);
$queue->setName('hello');
$queue->bind('helloexchange', 'hello');
$queue->qos(0, 1);
while (1) {
    if ($message = $queue->get()) {
        echo " [x] Received ", $message->getBody(), "\n";
        sleep(3);
        echo $message->getBody().PHP_EOL;
        echo " [x] Done".PHP_EOL;
        $queue->ack($message->getDeliveryTag());
    }
}
?>

需要注意的是:

  1. AMQPQueue 对象的 get 方法如果以 AMQP_AUTOACK 为参数则会自动发送 ack,无参数版本则需要手动调用 ack 方法发送
  2. AMQPQueue 对象的 qos 方法与上面所说的 basic_qos 方法一样,设置了能够接收的消息大小和消息数,由于 rabbitmq 并没有实现对消息大小的限制,所以这里第一个参数并没有意义,我们设为了 0
  3. 这里涉及到 exchange 的相关概念,我们马上来了解

3. Exchange

在上面的例子中,我们已经看到了 exchange 的创建和使用,此前,在 AMQP 的介绍中,我们也介绍了协议中的 Exchange 正如 AMQP 协议中描述的,producer 是通过 exchange 将消息发送到队列的,exchange 通过消息中的 routing key 决定最终发往的队列。

上面使用 php-amqplib 的例子中,并没有出现 exchange,是因为他自动使用了默认的 exchange amq.direct 实现点对点消息队列。 事实上,producer 是不能将消息发送给队列的,他只能发送给 exchange,由 exchange 决定发送到哪个队列,exchange type 决定了消息的最终处理方式。 Exchange 共有四种 type(模式)可供选择:

  1. direct
  2. fanout
  3. topic
  4. headers

4. Exchange 的四种模式

4.1. direct

direct 方式是最常用也是最简单的方式,当 Exchange 收到消息后,会将消息转发到消息的 routing key 所指定的消息队列中。 这种模式下,queue 需要执行 bind 操作绑定到 Exchange 上并提供绑定的 routing-key。 如果在 vhost 中不存在指定的 routing-key,消息就会被丢弃。

4.2. fanout

fanout 模式就是常用的发布/订阅模式,也称为“路由表”模式。 在这种模式下,Exchange 收到的任何消息都会被转发到所有与该 Exchange 绑定的所有 Queue 上。 因此,在这种模式下 Queue 必须 bind 到 Exchange 才会被通知,进而才能使用,同时,这种模式下不需要 routing-key。 一个 Queue 可以绑定多个 Exchange,一个 Exchange 也可以绑定多个 Queue。 如果 Exchange 并没有绑定任何 Queue,那么消息就会被丢弃。

4.3. topic

这种模式比较复杂,简单的来说,就是 Exchange 会把收到的消息转发到所有关心 routing-key 的 queue 上,Exchange 通过对消息的 routing-key 进行模糊匹配查找到对应的队列。 因此,与 fanout 一样,Queue 必须 bind 到 Exchange,同时与 direct 模式一样,必须指定 routing-key。 当一个 queue 执行 bind 操作绑定到 exchange 时,需要提供他关心的 routing-key,这个 routing-key 字符串可以是一个模糊匹配字符串,# 表示 0 个或若干个关键词, 表示一个关键词,如 log. 可以匹配成功 log.warn 但不能匹配成功 log.warn.timeout,而 #.log.# 可以匹配上述两个。 如果 Exchange 没有发现任何匹配的 Queue,消息就会被丢弃。

4.4. headers

Headers 模式一般很少被用到,他根据消息 header 中的 “x-match” 属性匹配已经绑定的消息队列。

5. 发布/订阅队列

使用上面介绍的 Fanout 模式的 Exchange 就可以实现发布订阅模式的消息队列了,如果使用 Topic 模式则可以实现更加灵活的发布/订阅消息队列实现。

5.1. php-amqplib

  • producer
代码语言:javascript
复制
<?php
require_once __DIR__ . '/../composer/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('access_log', false, false, false, false);
$channel->queue_declare('error_log', false, false, false, false);
$channel->queue_declare('warning_log', false, false, false, false);
$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->queue_bind('access_log', 'logs');
$channel->queue_bind('error_log', 'logs');
$channel->queue_bind('warning_log', 'logs');

for ($i = 0; $i < 6; $i++) {
    $msg = new AMQPMessage('Hello World!');
    $channel->basic_publish($msg, 'logs');

    echo " [x] Sent 'Hello World!'\n";
}

$channel->close();
$connection->close();
?>

这里我们声明了三个队列,并且全部通过 bind 操作绑定到了名为 "logs" 的 Exchange 上,然后发送了 6 条消息到 exchange,可以看到消息,与 logs exchange 绑定的三个队列都收到了 6 条消息。

6. 参考资料

Rabbitmq Tutorial — http://www.rabbitmq.com/tutorials/tutorial-one-php.html。 book.amqp.php — http://php.net/manual/pl/book.amqp.php。 Rabbitmq 消息队列在 PHP 下的应用 — http://www.cnblogs.com/phpinfo/p/4104551.html。 Rabbitmq 三种 Exchange 模式的性能比较 — http://hwcrazy.com/34195c9068c811e38a44000d601c5586/group/free_open_source_project/

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-03-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. 点对点连接
    • 2.1. php-amqplib
      • 2.2. Acknowledge
        • 2.3. PHP AMQP 扩展
        • 3. Exchange
        • 4. Exchange 的四种模式
          • 4.1. direct
            • 4.2. fanout
              • 4.3. topic
                • 4.4. headers
                • 5. 发布/订阅队列
                  • 5.1. php-amqplib
                  • 6. 参考资料
                  相关产品与服务
                  消息队列 CMQ 版
                  消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档