前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ 订阅模型-路由模式

RabbitMQ 订阅模型-路由模式

作者头像
栗筝i
发布2022-12-29 15:49:50
6860
发布2022-12-29 15:49:50
举报
文章被收录于专栏:迁移内容

订阅模型-路由模式,此时生产者发送消息时需要指定 RoutingKey,即路由 Key,Exchange 接收到消息时转发到与 RoutingKey 相匹配的队列中。 在 Direct 模型下:

  • 队列与交换机绑定,不能任意绑定,而要指定一个 RoutingKey (路由 key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不在把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 RoutingKey 与消息的 Routing Key完全一致,才会收到消息。

文章目录


一、RabbitMQ 订阅模型-路由(Direct)模式

1、RabbitMQ 路由(direct)模式

订阅模型-路由模式,此时生产者发送消息时需要指定 RoutingKey,即路由 Key,Exchange 接收到消息时转发到与 RoutingKey 相匹配的队列中。

在 Direct 模型下:

  • 队列与交换机绑定,不能任意绑定,而要指定一个 RoutingKey (路由 key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不在把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 RoutingKey 与消息的 Routing Key完全一致,才会收到消息。
2、路由(direct)模式组成

RabbitMQ 订阅模型-路由模式(Fanout)模式主要有以下六个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
  • Routing Key 路由关键字,exchange 根据这个关键字进行消息投递,Exchange 接收到的消息会带有 RoutingKey 这个字段,Exchange 就是根据这个 RoutingKey 和当前 Exchange 所有绑定的 BindingKey 做匹配,如果满足要求,就往 BindingKey 所绑定的 Queue 发送消息,这样我们就解决了我们向 RabbitMQ 发送一次消息,可以分发到不同的 Queue 的过程
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

二、RabbitMQ 订阅模型-路由(Direct)模式实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

代码语言:javascript
复制
<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.16.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>
  	...
</dependencies>
2、封装工具类 ConnectionUtil
代码语言:javascript
复制
package com.lizhengi.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 连接工具类封装
 * @date 2022-12-26 11:39 上午
 **/
public class ConnectionUtil {

    /**
     * 创建 MQ 连接工厂对象
     */
    private static final ConnectionFactory CONNECTION_FACTORY;

    // 类加载时,重量级资源加载
    static {
        CONNECTION_FACTORY = new ConnectionFactory();
        //设置连接MQ主机
        CONNECTION_FACTORY.setHost("127.0.0.1");
        //设置连接MQ端口号
        CONNECTION_FACTORY.setPort(5672);
        //设置连接MQ虚拟主机
        CONNECTION_FACTORY.setVirtualHost("/test");
        //设置连接MQ虚拟主机的用户名密码
        CONNECTION_FACTORY.setUsername("admin");
        CONNECTION_FACTORY.setPassword("123456");
    }

    /**
     * 建立与RabbitMQ连接 工具方法
     *
     * @return Connection
     */
    public static Connection getConnection() {
        try {
            //返回连接对象
            return CONNECTION_FACTORY.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 关闭通道和连接 工具方法
     *
     * @param channel    Channel
     * @param connection Connection
     */
    public static void closeConnectionAndChanel(Channel channel, Connection connection) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
3、生产者实现
代码语言:javascript
复制
package com.lizhengi.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-路由模式(Direct)模式 生产者
 * @date 2022-12-26 3:43 下午
 **/
public class Producer {
    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        /*  为通道声明交换机
         *  参数1:交换机名称;参数2:交换机类型
         */
        channel.exchangeDeclare("logs_direct", "direct");
        //发送消息
        String routingKey = "error";
//        String routingKey = "info";
        channel.basicPublish("logs_direct", routingKey, null, ("The message's routingKey is " + routingKey + " !").getBytes());

        //释放资源
        ConnectionUtil.closeConnectionAndChanel(channel, connection);
    }
}
4、消费者-1 实现
代码语言:javascript
复制
package com.lizhengi.direct;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-路由(Direct)模式 消费者
 * @date 2022-12-26 3:46 下午
 **/

public class Customer1 {
    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        //  绑定交换机
        channel.exchangeDeclare("logs_direct", "direct");
        //  为单个 Customer 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //  绑定交换机和队列和routingKey
        //  参数1:队列名字;参数2:交换机名称;参数3:routingKey 路由key
        channel.queueBind(queueName, "logs_direct", "error");
        //  消费消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}
5、消费者-2 实现
代码语言:javascript
复制
package com.lizhengi.direct;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-路由(Direct)模式 消费者
 * @date 2022-12-26 3:47 下午
 **/
public class Customer2 {
    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        //  绑定交换机
        channel.exchangeDeclare("logs_direct", "direct");
        //  为单个 Customer 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //  绑定交换机和队列和routingKey
        //  参数1:队列名字;参数2:交换机名称;参数3:routingKey 路由key
        channel.queueBind(queueName, "logs_direct", "info");
        channel.queueBind(queueName, "logs_direct", "error");
        channel.queueBind(queueName, "logs_direct", "warning");
        //  消费消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}

三、订阅模型 三种模式区别

1、RabbitMQ 消息订阅(Fanout)模式

RabbitMQ 消息订阅(Fanout)模式把交换机(Exchange)收到的消息发送给所有绑定了该交换机的队列,忽略路由(RoutingKey)。

这种模式下,消息会被所有消费者消费。也就是说,只要是"绑定"到某个交换机的队列,都会收到生产者发送到该交换机的消息。

2、RabbitMQ 路由(direct)模式

RabbitMQ 路由(direct)模式生产者发送信息时,需要指定一个路由(RoutingKey),交换机(Exchange)会根据路由将消息发送到绑定了此路由的队列中。

3、RabbitMQ 主题(topic)模式

在实际的运用中,广播模式(fanout)和路由模式(direct)虽然功能能支持一定场景,但是任然有一定的局限性,比如不能根据多重条件来进行路由选择。

发送给主题模式交换机的信息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符,标识符可以随意填充,但是一般是与某些特征相关联。

选择键不能超过 255 个字符。合法的键比如 “vip.user.order”,“common.user.pay”。 绑定键必须以相同的格式,特殊情况:“*” (星号)可以代替任意一个标识符;“#”(井号)可以代替0个或多个标识符。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-12-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 一、RabbitMQ 订阅模型-路由(Direct)模式
    • 1、RabbitMQ 路由(direct)模式
      • 2、路由(direct)模式组成
      • 二、RabbitMQ 订阅模型-路由(Direct)模式实现
        • 1、添加 Maven 依赖
          • 2、封装工具类 ConnectionUtil
            • 3、生产者实现
              • 4、消费者-1 实现
                • 5、消费者-2 实现
                • 三、订阅模型 三种模式区别
                  • 1、RabbitMQ 消息订阅(Fanout)模式
                    • 2、RabbitMQ 路由(direct)模式
                      • 3、RabbitMQ 主题(topic)模式
                      相关产品与服务
                      轻量应用服务器
                      轻量应用服务器(TencentCloud Lighthouse)是新一代开箱即用、面向轻量应用场景的云服务器产品,助力中小企业和开发者便捷高效的在云端构建网站、Web应用、小程序/小游戏、游戏服、电商应用、云盘/图床和开发测试环境,相比普通云服务器更加简单易用且更贴近应用,以套餐形式整体售卖云资源并提供高带宽流量包,将热门软件打包实现一键构建应用,提供极简上云体验。
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档