订阅模型-路由模式,此时生产者发送消息时需要指定 RoutingKey,即路由 Key,Exchange 接收到消息时转发到与 RoutingKey 相匹配的队列中。 在 Direct 模型下:
订阅模型-路由模式,此时生产者发送消息时需要指定 RoutingKey,即路由 Key,Exchange 接收到消息时转发到与 RoutingKey 相匹配的队列中。
在 Direct 模型下:
RabbitMQ 订阅模型-路由模式(Fanout)模式主要有以下六个角色构成:
# 在 pom.xml 文件中添加以下依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
...
</dependencies>
package com.lizhengi.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author liziheng
* @version 1.0.0
* @description 连接工具类封装
* @date 2022-12-26 11:39 上午
**/
public class ConnectionUtil {
/**
* 创建 MQ 连接工厂对象
*/
private static final ConnectionFactory CONNECTION_FACTORY;
// 类加载时,重量级资源加载
static {
CONNECTION_FACTORY = new ConnectionFactory();
//设置连接MQ主机
CONNECTION_FACTORY.setHost("127.0.0.1");
//设置连接MQ端口号
CONNECTION_FACTORY.setPort(5672);
//设置连接MQ虚拟主机
CONNECTION_FACTORY.setVirtualHost("/test");
//设置连接MQ虚拟主机的用户名密码
CONNECTION_FACTORY.setUsername("admin");
CONNECTION_FACTORY.setPassword("123456");
}
/**
* 建立与RabbitMQ连接 工具方法
*
* @return Connection
*/
public static Connection getConnection() {
try {
//返回连接对象
return CONNECTION_FACTORY.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 关闭通道和连接 工具方法
*
* @param channel Channel
* @param connection Connection
*/
public static void closeConnectionAndChanel(Channel channel, Connection connection) {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.lizhengi.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
/**
* @author liziheng
* @version 1.0.0
* @description 订阅模型-路由模式(Direct)模式 生产者
* @date 2022-12-26 3:43 下午
**/
public class Producer {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = ConnectionUtil.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
/* 为通道声明交换机
* 参数1:交换机名称;参数2:交换机类型
*/
channel.exchangeDeclare("logs_direct", "direct");
//发送消息
String routingKey = "error";
// String routingKey = "info";
channel.basicPublish("logs_direct", routingKey, null, ("The message's routingKey is " + routingKey + " !").getBytes());
//释放资源
ConnectionUtil.closeConnectionAndChanel(channel, connection);
}
}
package com.lizhengi.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author liziheng
* @version 1.0.0
* @description 订阅模型-路由(Direct)模式 消费者
* @date 2022-12-26 3:46 下午
**/
public class Customer1 {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = ConnectionUtil.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare("logs_direct", "direct");
// 为单个 Customer 创建临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和队列和routingKey
// 参数1:队列名字;参数2:交换机名称;参数3:routingKey 路由key
channel.queueBind(queueName, "logs_direct", "error");
// 消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("消费者1:" + new String(body));
}
});
}
}
package com.lizhengi.direct;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author liziheng
* @version 1.0.0
* @description 订阅模型-路由(Direct)模式 消费者
* @date 2022-12-26 3:47 下午
**/
public class Customer2 {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = ConnectionUtil.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
// 绑定交换机
channel.exchangeDeclare("logs_direct", "direct");
// 为单个 Customer 创建临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和队列和routingKey
// 参数1:队列名字;参数2:交换机名称;参数3:routingKey 路由key
channel.queueBind(queueName, "logs_direct", "info");
channel.queueBind(queueName, "logs_direct", "error");
channel.queueBind(queueName, "logs_direct", "warning");
// 消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("消费者1:" + new String(body));
}
});
}
}
RabbitMQ 消息订阅(Fanout)模式把交换机(Exchange)收到的消息发送给所有绑定了该交换机的队列,忽略路由(RoutingKey)。
这种模式下,消息会被所有消费者消费。也就是说,只要是"绑定"到某个交换机的队列,都会收到生产者发送到该交换机的消息。
RabbitMQ 路由(direct)模式生产者发送信息时,需要指定一个路由(RoutingKey),交换机(Exchange)会根据路由将消息发送到绑定了此路由的队列中。
在实际的运用中,广播模式(fanout)和路由模式(direct)虽然功能能支持一定场景,但是任然有一定的局限性,比如不能根据多重条件来进行路由选择。
发送给主题模式交换机的信息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符,标识符可以随意填充,但是一般是与某些特征相关联。
选择键不能超过 255 个字符。合法的键比如 “vip.user.order”,“common.user.pay”。 绑定键必须以相同的格式,特殊情况:“*” (星号)可以代替任意一个标识符;“#”(井号)可以代替0个或多个标识符。