RabbitMQ的安装以及五种队列模式
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而群集和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
docker一键安装
# 拉去镜像(后缀为management表示为带图形化管理界面的版本)
docker pull docker.io/rabbitmq:3.8-management
# 启动镜像
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin [ent_id]
# -d 后台运行容器;
# --name 指定容器名;
# -p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
# -v 映射目录或文件;
# --hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
# -e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)
导入依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
代码
工具方法码
//用于返回一个连接
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("testhost");
factory.setUsername("admin");
factory.setPassword("admin");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
生产者
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
消费者
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
生产者
String QUEUE_NAME = "test_queue_work";
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
// 消息内容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
channel.close();
connection.close();
消费者X2
String QUEUE_NAME = "test_queue_work";
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [y] Received '" + message + "'");
// 返回确认状态,注释掉表示使用自动确认模式
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
两种分发模式
默认情况下是使用的轮询分发模式。将上述代码注释移除,并将channel.basicConsume(QUEUE_NAME, false, consumer);
设置为false,则会采用公平分发
PS:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
生产者
// 交换机名称
String EXCHANGE_NAME = "test_exchange_fanout";
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
PS:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
消费者
//另一个消费则需要将队列名称换成另外一个例如test_queue_work2,其他代码相同
String QUEUE_NAME = "test_queue_work1";
String EXCHANGE_NAME = "test_exchange_fanout";
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
在
Publish/Subscibe
模式中,所有的消息均会发送到所有的消费者,但是目前有这样一个场景,所有的日志记录必须发送到消费者A,用于记录消息,但是只有错误的日志需要发送到消费者B,这是就需Exchange
有过滤功能 在Routing
模式下,就可以实现这个功能
生产者
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();
//声明Exchange
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//分别发送两条消息到"delete"、"insert"渠道
channel.basicPublish(EXCHANGE_NAME,"delete",null,"删除商品".getBytes());
channel.basicPublish(EXCHANGE_NAME,"insert",null,"插入商品".getBytes());
channel.close();
connection.close();
消费者A
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME1,false,false,false,null);
//绑定到交换机,接受"insert"、"delete"两个渠道的消息,也就是最终结果会受到两条消息
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"insert");
channel.queueBind(QUEUE_NAME1,EXCHANGE_NAME,"delete");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME1, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv1] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
消费者B
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME2,false,false,false,null);
//绑定到交换机,只绑定了"delete"渠道,也就是只会受到一条消息
channel.queueBind(QUEUE_NAME2,EXCHANGE_NAME,"delete");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME2, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv2] Received '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
主题模式是路由模式的一个升级,在过滤条件上更加灵活 主题模式是将路由键和某个模式进行匹配。此时队列需要绑定一个模式上。
#
匹配一个或多个词,*
匹配不多不少一个词。因此audit.#
能够匹配到audit.irs.corporate
,但是audit.*
只会匹配到audit.irs
生产者
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//发送两条消息
channel.basicPublish(EXCHANGE_NAME, "routkey.1", null, "routkey消息".getBytes());
channel.basicPublish(EXCHANGE_NAME, "common.1", null, "common消息".getBytes());
channel.close();
connection.close();
消费者A
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
//绑定到交换机
channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "routkey.#");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME1, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv1] Received '" + message + "'");
}
消费者B
Connection connection = ConnectUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME2, false, false, false, null);
//绑定到交换机
channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "#.#");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME2, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [Recv2] Received '" + message + "'");
}