前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ 学习(六)---- 路由订阅模型

RabbitMQ 学习(六)---- 路由订阅模型

作者头像
RAIN7
发布2022-10-04 20:24:17
2960
发布2022-10-04 20:24:17
举报

文章目录

RabbitMQ 学习(六)---- 路由订阅模型

1、Direct 路由直连模式

在这里插入图片描述
在这里插入图片描述

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  •   队列与交换机的绑定,不是任意绑定,而是要指定一个RoutingKey(路由key),相当于是一个队列与交换机连接的规则
  •   生产者 在向 Exchange发送消息时,也必须指定消息的 RoutingKey
  •   Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有绑定队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

(1)生产者

  • 声明交换机(exchange)与类型direct
  • 生产者通过交换机(exchange),设置路由规则(routineKey),发送消息

生产者代码

发送信息到 "aaa"交换机下“info” 路由key,"warning"路由key

代码语言:javascript
复制
package direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;

import java.io.IOException;

public class DProvider {
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {
            // 创建信道
            channel = connection.createChannel();

            // 声明交换机以及类型
            channel.exchangeDeclare("aaa", "direct");

            // 设置消息
            String body = "direct模式发送消息";

            // 在信道中 将消息 发送到交换机 同时设置路由规则

            //发送路由为 info的消息
            channel.basicPublish("aaa", "info", null,(body+":info").getBytes() );

            // 发送路由为 warning的消息
            channel.basicPublish("aaa", "warning", null, (body+":warning").getBytes());

        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            RabbitMQUtils.close(channel, connection);
        }
    }
}

(2)消费者

  • 声明交换机与类型(direct),与生产者保持一致
  • 声明临时队列(queue)
  • 临时队列与交换机绑定(queueBind),同时设置路由规则,队列订阅了交换机中指定路由的信息,一个队列可以绑定多个路由,使用多次 queueBind

消费者1

临时队列订阅了 aaa交换机 中"info" 、“warning” 的路由信息

代码语言:javascript
复制
package direct;

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

public class DCustomer1 {
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {
            // 创建信道
            channel = connection.createChannel();

            // 声明交换机,与生产者保持一致
            channel.exchangeDeclare("aaa", "direct");

            // 声明临时队列
            String queue = channel.queueDeclare().getQueue();

            // 绑定临时队列 与 交换机,订阅交换机中具体路由规则分发的信息, 交换机和路由规则都是 生产者指定的
            // 如果绑定了路由,那么相当于订阅了消息,一种符合规则的广播
            channel.queueBind(queue, "aaa", "info");
            channel.queueBind(queue, "aaa", "warning");

            // 通过队列 接受消息
            channel.basicConsume(queue, false, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body)+"    routingkey:"+envelope.getRoutingKey());
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

消费者2

临时队列订阅了 aaa 交换机中 “info” 路由下的信息

代码语言:javascript
复制
package direct;

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

public class DCustomer2 {
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {
            // 创建信道
            channel = connection.createChannel();

            // 声明交换机,与生产者保持一致
            channel.exchangeDeclare("aaa", "direct");

            // 声明临时队列
            String queue = channel.queueDeclare().getQueue();

            // 绑定临时队列 与 交换机,订阅交换机中具体路由规则分发的信息, 交换机和路由规则都是 生产者指定的
            // 如果绑定了路由,那么相当于订阅了消息,一种符合规则的广播
            channel.queueBind(queue, "aaa", "info");

            // 通过队列 接受消息
            channel.basicConsume(queue, false, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body)+"    routinekey:"+envelope.getRoutingKey());
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

(3)效果展示

  消费者1因为 临时队列订阅了 “info”、“warning” 路由,所以生产者的信息都能接收到

在这里插入图片描述
在这里插入图片描述

  消费者2 临时队列因为订阅了 “info” 路由的信息,所有只能接收到 生产者发送的"info"路由中的信息,而 “warning” 路由信息中的信息接收不到。

在这里插入图片描述
在这里插入图片描述

查看后台交换机路由队列绑定的信息

在这里插入图片描述
在这里插入图片描述

查看队列信息,消费者1 的临时队列,接受了交换机路由 “info”、"warning"的信息

在这里插入图片描述
在这里插入图片描述

消费者2的临时队列,接受了交换机路由"info" 的信息

在这里插入图片描述
在这里插入图片描述

2、Topic 路由通配模式

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

在这里插入图片描述
在这里插入图片描述

就一句话,与direct模式就是路由匹配多了一个 使用统配符 的功能

(1)通配符使用规则

. 匹配一个单词 # 匹配零个或多个单词 使用如下 admin.* 匹配 admin.staus、admin.item admin.# 匹配 admin.status.item 、admin

(2)生产者

生产者发送消息到交换机,设置三个不同的路由规则分发消息

routingKey: admin ] message: “生产者 admin 的消息!” routineKey: admin.user message: “生产者 admin.user 的消息!” routineKey: admin.user.name message: “生产者 admin.user.name 的消息!”

代码语言:javascript
复制
package topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;

import java.io.IOException;

public class TProvider {
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel =null;
        try {
            // 创建信道
            assert connection != null;
            channel = connection.createChannel();

            // 声明交换机,声明类型Topic
            channel.exchangeDeclare("bbb", "topic");

            // 通过信道,发送交换机设置路由规则
            channel.basicPublish("bbb", "admin.user", null, ("生产者 admin.user 的消息!").getBytes());

            channel.basicPublish("bbb", "admin", null, ("生产者 admin 的消息").getBytes());

            channel.basicPublish("bbb", "admin.user.name", null, ("生产者 admin.user.name 的消息").getBytes());

        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            RabbitMQUtils.close(channel, connection);
        }
    }
}

(3)消费者

  临时队列与交换机绑定(queueBind),同时设置路由规则,不用想direct模式中 需要一个一个规则绑定,直接使用通配符进行绑定即可,可以实现一行语句绑定多个交换机路由

消费者1

临时队列绑定bbb交换机中 admin.* 路由中的消息

代码语言:javascript
复制
package topic;

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

public class TCustomer1 {
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();

        try {
            // 创建信道
            Channel channel = connection.createChannel();

            // 声明交换机
            channel.exchangeDeclare("bbb", "topic");

            // 声明临时队列
            String queue = channel.queueDeclare().getQueue();

            // 队列绑定交换机,并订阅路由(使用通配符)
            channel.queueBind(queue, "bbb", "admin.*");

            // 通过队列接收消息
            channel.basicConsume(queue, false, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body)+"  消费者接收路由通配: admin.*");
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

消费者2

临时队列绑定bbb交换机中 admin.# 路由中的消息

代码语言:javascript
复制
package topic;

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

public class TCustomer2 {
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();

        try {
            // 创建信道
            Channel channel = connection.createChannel();

            // 声明交换机
            channel.exchangeDeclare("bbb", "topic");

            // 声明临时队列
            String queue = channel.queueDeclare().getQueue();

            // 队列绑定交换机,并订阅路由(使用通配符)
            channel.queueBind(queue, "bbb", "admin.#");

            // 通过队列接收消息
            channel.basicConsume(queue, false, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body)+"  消费者接收路由通配: admin.#");
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

(4)效果展示

后台交换机绑定队列与路由key

在这里插入图片描述
在这里插入图片描述

消费者2 队列admin.# 匹配零个或多个单词,生产者路由信息全部匹配

在这里插入图片描述
在这里插入图片描述

消费者1 队列admin.* 匹配一个单词,生产者路由 admin.user 匹配成功

在这里插入图片描述
在这里插入图片描述

消费者1 的接收到的信息

在这里插入图片描述
在这里插入图片描述

消费者2 接收到的信息

在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-09-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • RabbitMQ 学习(六)---- 路由订阅模型
  • 1、Direct 路由直连模式
    • (1)生产者
      • 生产者代码
    • (2)消费者
      • 消费者1
      • 消费者2
    • (3)效果展示
    • 2、Topic 路由通配模式
      • (1)通配符使用规则
        • (2)生产者
          • (3)消费者
            • 消费者1
            • 消费者2
          • (4)效果展示
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档