前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >webman使用RabbitMQ消息中间件实现系统异步解耦实战教程

webman使用RabbitMQ消息中间件实现系统异步解耦实战教程

作者头像
Tinywan
发布2024-06-18 15:38:55
1850
发布2024-06-18 15:38:55
举报
文章被收录于专栏:开源技术小栈

unsetunset简介unsetunset

RabbitMQ是一个开源的消息代理软件,它使用高级消息队列协议(AMQP)来实现消息的发送和接收。RabbitMQ支持多种消息协议,包括STOMP、MQTT等,并且能够与多种编程语言和平台集成,如Java、.NET、Python等。

AMQP 即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

基本架构设计

图片来源:https://blog.csdn.net/cuierdan/article/details/123824300

基本概念

  • Message Broker:(消息代理服务器)是一个虚拟的概念,而RabbitMQ是Message Broker的一个实例。
  • Producer:(生产者)产生数据并将数据发送到消息代理服务器(Message Broker)的程序被称作消息的生产者。
  • Connection:(连接)生产者与消费者通过TCP协议与消息代理服务器(Message Broker)创建的连接。
  • Channel:(信道)创建在Connection中的虚拟连接,类似于连接数据库时的连接池的概念,生产者和消费者并不是直接与MQ通过Connection进行通讯的,而是通过Channel进行连接通讯的,数据的流动是在Channel中进行的。
  • VirtualHost:(虚拟消息服务器)就像mysql数据库中有数据库实例的概念,并且可以指定用户对库和表等操作的设置权限。也可以类别成LINUX系统中的不同用户,不同用户之间是相互独立的。每个VirtualHost相当于一个相对独立mini的RabbitMQ服务器。每个VirtutalHost之间是相互隔离的,exchange,queue,message等不能互通。
  • Exchange:(交换机)交换机直接与Channel(信道)连接,接收来自于消息生产者产生的数据,在由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息。RabbitMQ的交换机有fanout(扇出),direct(直接),topic(主题),headers(标题)四种类型,每种交换机类型都对应着不同的路由规则,根据不同的路由规则,交换机会将消息路由到不同的队列中。
  • Binding: (绑定)交换机与队列之间的虚拟连接,在这个绑定中可以设置Binding Key,一个绑定就是用一个Binding Key将交换器和队列连接起来,设置的Binding Key存在着一定的规则,Exchange会将消息中携带的Routing Key与Binding Key 中设置的规则进行匹配,将消息发送到相应的队列中。Binding信息被保存到Exchange中的查询表中,用于Exchange将消息分发到队列的依据。
  • Routing Key:(路由键)用于匹配路由规则的依据,生产者在将消息发送到Exchange时,一般会指定一个Routing Key,交换机会根据Routing Key 来匹配Binding中设置的路由规则,将符合规则的消息发送到指定的队列中。
  • Queue:(消息队列)RabbitMQ中的内部对象用于存放消息的容器,RabbitMQ会将消息按照RabbitMQ的六大模式中的一种将队列中的消息发送给消费者,RabbitMQ会根据选择模式的不同将队列中的消息发送给一个或多个消费者,在连接到消费者之前,消息一直在等待消费者到队列中将消息取走。
  • Consumer:(消费者)消息的消费者,表示一个从队列中取消息的应用程序。

特点

  1. 可靠性:RabbitMQ使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。
  2. 灵活的路由:在消息进入队列之前,通过交换器来路由消息。
  3. 扩展性:多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点。
  4. 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队 列仍然可用。
  5. 多种协议:RabbitMQ除了原生支持AMQP协议,还支持STOMP, MQTT等多种消息 中间件协议。
  6. 支持多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript 等。

主要功能

  1. 消息队列:允许应用程序将消息发送到队列中,然后由另一个应用程序从队列中取出并处理。
  2. 消息路由:支持将消息从发送者路由到一个或多个接收者。
  3. 消息持久化:确保消息在系统故障后不会丢失。
  4. 消息确认:确保消息被正确处理,如果处理失败,可以重新发送。
  5. 集群:支持在多个节点上运行,以提供高可用性和负载均衡。

unsetunset安装unsetunset

这里使用1Panel安装,1Panel 是一个现代化、开源的 Linux 服务器运维管理面板

应用商店

安装镜像

注:这里需要勾选【端口外部访问】,方便本地调试

镜像

安装成功后,就可以在镜像中找到已安装好的RabbitMQ镜像容器

外网可访问地址:http://{{你的公网ip}}:15672,如果是在云服务器,记得安全策略放开端口15672

  • RabbitMQ管理界面端口15672。是一个Web应用程序,用于管理和监控RabbitMQ消息代理
  • AMQP默认端口5672。是一种网络协议,用于在应用程序之间传递消息,通常用于消息队列系统。

控制台

登陆控制台,账号和密码都是rabbitmq

unsetunset使用unsetunset

这里使用webman插件RabbitMQ客户端,插件地址:https://www.workerman.net/plugin/67

非常感谢兔子大佬的插件贡献!🌻

非常感谢兔子大佬的插件贡献!🌻

非常感谢兔子大佬的插件贡献!🌻

插件特点

  • 支持5种消费模式:简单队列、workQueue、routing、pub/sub、exchange;
  • 支持延迟队列(RabbitMQ须安装插件);
  • 异步无阻塞消费、异步无阻塞生产、同步阻塞生产

安装插件

通过composer包管理安装

代码语言:javascript
复制
composer require workbunny/webman-rabbitmq

注:安装该插件前,请确保你已经安装好webman框架,相关安装文档:https://www.workerman.net/doc/webman/install.html

配置

插件所有配置文件路径:config/plugin/workbunny/webman-rabbitmq/app.php

代码语言:javascript
复制
<?php
return [
    'enable' => true,
    'host'               => '120.120.120.74',
    'vhost'              => '/',
    'port'               => 5672,
    'username'           => 'rabbitmq',
    'password'           => 'rabbitmq',
    'mechanisms'         => 'AMQPLAIN',
    ...
];
  • host 修改为服务器公网ip
  • port 修改为15672

消费者

这里使用命令创建一个拥有单进程消费者的RestyBuilder

代码语言:javascript
复制
./webman workbunny:rabbitmq-builder resty --mode=queue

ℹ️ Config updated.
ℹ️ Builder created.
✅ Builder RestyBuilder created successfully.

创建完成后完整的消费者文件位置process/workbunny/rabbitmq/RestyBuilder.php

代码语言:javascript
复制
<?php declare(strict_types=1);

namespace process\workbunny\rabbitmq;

use Bunny\Channel as BunnyChannel;
use Bunny\Async\Client as BunnyClient;
use Bunny\Message as BunnyMessage;
use Workbunny\WebmanRabbitMQ\Constants;
use Workbunny\WebmanRabbitMQ\Builders\QueueBuilder;

class RestyBuilder extends QueueBuilder
{
    /**
     * @var array = [
     *   'name'           => 'example',
     *   'delayed'        => false,
     *   'prefetch_count' => 1,
     *   'prefetch_size'  => 0,
     *   'is_global'      => false,
     *   'routing_key'    => '',
     * ]
     */
    protected array $queueConfig = [
        // 队列名称 ,默认由类名自动生成
        'name'           => 'process.workbunny.rabbitmq.RestyBuilder',
        // 是否延迟          
        'delayed'        => false,
        // QOS 数量
        'prefetch_count' => 0,
        // QOS size 
        'prefetch_size'  => 0,
        // QOS 全局
        'is_global'      => false,
        // 路由键
        'routing_key'    => '',
    ];
    
    /** @var string 交换机类型 */
    protected string $exchangeType = Constants::DIRECT;
    
    /** @var string|null 交换机名称,默认由类名自动生成 */
    protected ?string $exchangeName = 'process.workbunny.rabbitmq.RestyBuilder';
    
    /** @inheritDoc */
    public function handler(BunnyMessage $message, BunnyChannel $channel, BunnyClient $client): string 
    {
        echo '[RabbitMQ][队列消费] Tag:' .$message->consumerTag. PHP_EOL;
        echo '[RabbitMQ][队列消费着] Content:' .$message->content. PHP_EOL;
        echo '[RabbitMQ][队列消费着] Exchange:' .$message->exchange. PHP_EOL;
        return Constants::ACK;
    }
}

启动webman

代码语言:javascript
复制
php start.php start
Workerman[start.php] start in DEBUG mode
----------------------------------------------------------------------- WORKERMAN -----------------------------------------------------------------------
Workerman version:4.1.15          PHP version:8.2.18           Event-Loop:\Workerman\Events\Event
------------------------------------------------------------------------ WORKERS ------------------------------------------------------------------------
proto   user            worker                                                                      listen                 processes    status           
tcp     root            webman                                                                      http://0.0.0.0:8217    2             [OK]            
tcp     root            monitor                                                                     none                   1             [OK]            
tcp     root            plugin.workbunny.webman-rabbitmq.process.workbunny.rabbitmq.RestyBuilder    none                   1             [OK]            
---------------------------------------------------------------------------------------------------------------------------------------------------------

生产者

代码语言:javascript
复制
use function Workbunny\WebmanRabbitMQ\sync_publish;
use process\workbunny\rabbitmq\RestyBuilder;

sync_publish(RestyBuilder::instance(), '兔子大佬你好呀!'); # return bool

消费结果

代码语言:javascript
复制
[RabbitMQ][队列消费] Tag:process.workbunny.rabbitmq.RestyBuilder
[RabbitMQ][队列消费着] Content:开源技术小栈你好呀!
[RabbitMQ][队列消费着] Exchange:process.workbunny.rabbitmq.RestyBuilder

...

[RabbitMQ][队列消费] Tag:process.workbunny.rabbitmq.RestyBuilder
[RabbitMQ][队列消费着] Content:兔子大佬你好呀!
[RabbitMQ][队列消费着] Exchange:process.workbunny.rabbitmq.RestyBuilder

unsetunsetRabbitMQ管理界面unsetunset

通过RabbitMQ管理界面端发送消息

消费者消费情况

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-06-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开源技术小栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • unsetunset简介unsetunset
    • 基本架构设计
      • 基本概念
        • 特点
          • 主要功能
          • unsetunset安装unsetunset
            • 应用商店
              • 安装镜像
                • 镜像
                  • 控制台
                  • unsetunset使用unsetunset
                    • 插件特点
                      • 安装插件
                        • 配置
                          • 消费者
                            • 启动webman
                              • 生产者
                                • 消费结果
                                • unsetunsetRabbitMQ管理界面unsetunset
                                相关产品与服务
                                领券
                                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档