端口 | 描述 |
---|---|
4369 | 是 Erlang 的端口/节点名称映射程序,用来跟踪节点名称监听地址,在集群中起到一个类似 DNS 的作用。 |
5672、5671 | AMQP 0-9-1 和 1.0 客户端端口,没有使用 SSL 和使用 SSL 的端口。 |
25672 | 用于 RabbitMQ 节点间和 CLI 工具通信,配合 4369 使用。 |
15672 | HTTP_API 端口,管理员用户才能访问,用于管理 RabbitMQ,需要启用 management 插件。 |
61613、61614 | 当 STOMP 插件启用的时候打开,作为 STOME 客户端端口(根据是否使用 TLS 选择)。 |
1883、8883 | 当 MQTT 插件启用的时候打开,作为 MQTT 客户端端口(根据是否使用 TLS 选择) |
15674 | 基于 WebSocket 的 STOMP 客户端端口(当插件 Web STOMP启用的时候打开)。 |
15675 | 基于 WebSocket 的 MQTT 客户端端口(当插件 Web MQTT 启用的时候打开)。 |
none、management、policymaker、monitoring、administrator
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
/**
* 简单队列生产者
* 使用 RabbitMQ 的默认交换器发送消息
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置连接属性
factory.setHost("114.67.85.157");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3. 从连接工厂获取连接
connection = factory.newConnection("生产者");
// 4. 从连接中创建通道
channel = connection.createChannel();
/**
* 5、声明(创建)队列
* 如果队列不存在,才会创建
* RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
*
* queueDeclare参数说明:
* @param queue 队列名称
* @param durable 队列是否持久化
* @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,
* 并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。
* 一般在队列和交换器绑定时使用
* @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
* @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
*/
channel.queueDeclare("queue1", false, false, false, null);
// 消息内容
String message = "Hello World!";
// 6. 发送消息
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息已发送!");
} finally {
// 7. 关闭通道
if (channel != null && channel.isOpen()) {
channel.close();
}
// 8. 关闭连接
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
}
/**
* 简单队列消费者
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("114.67.85.157");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("消费者");
channel = connection.createChannel();
channel.queueDeclare("queue1", false, false, false, null);
// 定义收到消息后的回调
DeliverCallback callback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
}
};
// 监听队列
channel.basicConsume("queue1", true, callback, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
System.out.println("开始接收消息!");
System.in.read();
} finally {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
}
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
queueDeclare
、交换机的声明 exchangeDeclare
、队列的绑定 queueBind
、发布消息 basicPublish
、消费消息 basicConsume
等。
RabbitMQ 常用交换器类型:fanout、direct、topic、headers 四种。
/**
* Topic--生产者
* 生产者将消息发送到topic类型的交换器上,和routing的用法类似,都是通过routingKey路由,但topic类型交换器的routingKey支持通配符
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置连接属性
factory.setHost("114.67.85.157");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
// 3. 从连接工厂创建连接
connection = factory.newConnection("生产者");
// 4. 从连接中创建通道
channel = connection.createChannel();
// 路由关系如下:com.# --> queue-1 *.order.* --> queue-2
// 消息内容
String message = "Hello A";
// 发送消息到 topic_test 交换器上
channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes());
System.out.println("消息 " + message + "已发送!");
// 消息内容
message = "Hello B";
// 发送消息到topic_test交换器上
channel.basicPublish("topic-exchange", "com.sms.create", null, message.getBytes());
System.out.println("消息 " + message + " 已发送!");
// 消息内容
message = "Hello C";
// 发送消息到topic_test交换器上
channel.basicPublish("topic-exchange", "cn.order.create", null, message.getBytes());
System.out.println("消息 " + message + " 已发送!");
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
// 7、关闭通道
if (channel != null && channel.isOpen()) {
channel.close();
}
// 8、关闭连接
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
}
/**
* 路由--消费者
* 消费者通过一个临时队列和交换器绑定,接收发送到交换器上的消息
*/
public class Consumer {
private static Runnable receive = () -> {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("114.67.85.157");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = null;
Channel channel = null;
final String queueName = Thread.currentThread().getName();
try {
connection = factory.newConnection("消费者");
channel = connection.createChannel();
DeliverCallback callback = (consumerTag, message) -> {
System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
};
channel.basicConsume(queueName, true, callback, consumerTag -> {
});
System.out.println(queueName + " 开始接收消息");
System.in.read();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
public static void main(String[] args) {
new Thread(receive, "queue-1").start();
new Thread(receive, "queue-2").start();
}
}