单生产单消费模型,即基本消息模型或简单消费模型,即完成基本的一对一消息转发。 RabbitMQ 单生产单消费模型主要有以下三个角色构成:
总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区! ~ 本篇内容包括:RabbitMQ 基本(单生产单消费)消息模型、RabbitMQ 单生产消费模型实现、RabbitMQ 单生产消费模型实现(连接工具类封装)
单生产单消费模型,即基本消息模型或简单消费模型,即完成基本的一对一消息转发。
RabbitMQ 单生产单消费模型主要有以下三个角色构成:
总之:生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区!
channel.queueDeclare("HelloWorld",false,false,false,null);
channel.basicPublish
参数3进行额外设置:MessageProperties.PERSISTENT_TEXT_PLAIN
;# 在 pom.xml 文件中添加以下依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
...
</dependencies>
# Rabbit 基本消息模型 生产者-Producer
package com.lizhengi.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.testng.annotations.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author liziheng
* @version 1.0.0
* @description Rabbit 基本消息模型 生产者
* @date 2022-12-18 5:09 下午
**/
public class Producer {
/**
* 消息生产
*
* @throws IOException IOException
* @throws TimeoutException TimeoutException
*/
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 创建MQ连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接MQ主机
connectionFactory.setHost("127.0.0.1");
// 设置连接MQ端口号
connectionFactory.setPort(5672);
// 设置连接MQ虚拟主机
connectionFactory.setVirtualHost("/test");
// 设置连接MQ虚拟主机的用户名密码
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
// 获取连接对象
Connection connection = new ConnectionFactory().newConnection();
// 获取连接通道
Channel channel = connection.createChannel();
/* 通道绑定对应消息队列
* 参数1:队列名称,如果不存在则自动创建;
* 参数2:队列特性-是否持久化;
* 参数3:队列特性-是否独占队列;
* 参数4:队列特性-是否队列完成后自动删除队列;
* 参数5:额外附加参数
*/
channel.queueDeclare("HelloWorld", false, false, false, null);
/* 发布消息
* 参数1:交换机名称;参数2:队列名称;参数3:传递消息额外值设置;参数4:具体消息内容
*/
channel.basicPublish("", "HelloWorld", null, "Hello RabbitMQ".getBytes());
// 连接关闭
channel.close();
connection.close();
}
}
# Rabbit 基本消息模型 消费者-Customer
package com.lizhengi.hello;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author liziheng
* @version 1.0.0
* @description Rabbit 基本消息模型 消费者
* @date 2022-12-18 5:19 下午
**/
public class Customer {
public static void main(String[] msg) throws IOException, TimeoutException {
// 创建MQ连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/test");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("123456");
Connection connection = new ConnectionFactory().newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("HelloWorld", false, false, false, null);
/* 消费消息
* 参数1:消费队列名称;参数2:开始消费的自动确认机制;参数3:消费时的回调接口
*/
channel.basicConsume("HelloWorld", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("The Body:" + new String(body));
}
});
// 关闭连接
channel.close();
connection.close();
}
}
# 连接工具类封装-ConnectionUtil
package com.lizhengi.hello.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author liziheng
* @version 1.0.0
* @description 连接工具类封装
* @date 2022-12-18 5:24 下午
**/
public class ConnectionUtil {
/**
* 创建 MQ 连接工厂对象
*/
private static final ConnectionFactory CONNECTION_FACTORY;
// 类加载时,重量级资源加载
static {
CONNECTION_FACTORY = new ConnectionFactory();
//设置连接MQ主机
CONNECTION_FACTORY.setHost("127.0.0.1");
//设置连接MQ端口号
CONNECTION_FACTORY.setPort(5672);
//设置连接MQ虚拟主机
CONNECTION_FACTORY.setVirtualHost("/test");
//设置连接MQ虚拟主机的用户名密码
CONNECTION_FACTORY.setUsername("admin");
CONNECTION_FACTORY.setPassword("123456");
}
/**
* 建立与RabbitMQ连接 工具方法
*
* @return Connection
*/
public static Connection getConnection() {
try {
//返回连接对象
return CONNECTION_FACTORY.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* 关闭通道和连接 工具方法
*
* @param channel Channel
* @param connection Connection
*/
public static void closeConnectionAndChanel(Channel channel, Connection connection) {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
# Rabbit 基本消息模型 生产者-Producer
package com.lizhengi.hello.demo2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.testng.annotations.Test;
import java.io.IOException;
/**
* @author liziheng
* @version 1.0.0
* @description Rabbit 基本消息模型 生产者
* @date 2022-12-18 5:26 下午
**/
public class Producer {
/**
* 消息生产
* @throws IOException IOException
*/
@Test
public void testSendMessage() throws IOException {
Connection connection = ConnectionUtil.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.queueDeclare("HelloWorld",false,false,false,null);
channel.basicPublish("","HelloWorld",null,"Hello RabbitMQ".getBytes());
//资源关闭
ConnectionUtil.closeConnectionAndChanel(channel,connection);
}
}
# Rabbit 基本消息模型 消费者-Customer
package com.lizhengi.hello.demo2;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @author liziheng
* @version 1.0.0
* @description Rabbit 基本消息模型 消费者
* @date 2022-12-18 5:26 下午
**/
public class Customer {
public static void main(String[] msg) throws IOException {
Connection connection = ConnectionUtil.getConnection();
assert connection != null;
Channel channel = connection.createChannel();
channel.queueDeclare("HelloWorld",false,false,false,null);
channel.basicConsume("HelloWorld",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("The Body:" + new String(body));
}
});
}
}