MQ(Message Quene):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间的解耦。别名为消息中间件通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发的RocketMQ等。
# 1. ActiveMQ
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界称为老牌的消息中间件,在中小企业颇受欢迎。
# 2. Kafka
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复,丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。
# 3. RocketMQ
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化。目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
# 4. RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景、对性能和吞吐量的要求还在其次。
RabbitMQ 比 Kafka 可靠,Kafka 更适合 IO 高吞吐量的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。
基于 AMQP 协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。 AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
外链:https://wwe.lanzous.com/b01c671oh 密码:666
上传3个包到指定目录后,执行命令
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm
然后复制配置文件模板到指定目录(如果不指定位置可以使用find命令)
find / -name rabbitmq.config.example
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
进入指定目录后,修改配置文件
cd /etc/rabbitmq/
vim rabbitmq.config
如下:
# 启动rabbitmq插件的管理控制(Web界面)
rabbitmq-plugins enable rabbitmq_management
# 停止、启动、重启
systemctl stop rabbitmq-server
systemctl start rabbitmq-server
systemctl restart rabbitmq-server
# 查看运行状态
systemctl status rabbitmq-server
# 查看运行状态
systemctl status firewalld
# 禁用、停止
systemctl disable firewalld
systemctl stop firewalld
访问地址:http://192.168.176.130:15672
账号/密码:guest
# 1. 服务启动相关
systemctl start|restart|stop|status rabbitmq-server
# 2. 管理命令行 用来在不使用web管理界面情况下命令操作RabbitMQ
rabbitmqctl help #可以查看更多命令
# 3. 插件管理命令行
rabbitmq-plugins enable|list|disable
AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。
工作过程:
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
注意:必须 / 开头
以ems用户为例
在上图的模型中,有以下概念:
package helloword;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description: 生产者
* @Author Ray
* @Date 2021/03/05 14:53
* @Version 1.0
*/
public class Provider {
// 生产消息
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接mq主机
connectionFactory.setHost("192.168.176.130");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
// 获取连接对象
Connection connection = connectionFactory.newConnection();
// 获取连接中通道对象
Channel channel = connection.createChannel();
// 通道绑定对应消息队列
// 参数1:队列名称,如果队列不存在会自动创建
// 参数2:用来定义队列特性是否要持久化
// 参数3:是否独占队列,表示只有当前连接可用该队列
// 参数4:是否在消费完成后自动删除队列
// 参数5:额外附加参数
channel.queueDeclare("hello", false, false, false, null);
// 发布消息
// 参数1:交换器名称
// 参数2:队列名称
// 参数3:传递消息额外设置
// 参数4:消息的具体内容
channel.basicPublish("", "hello", null, "hello rabbitmq".getBytes());
// 关闭
channel.close();
connection.close();
}
}
package helloword;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description: 消费者
* @Author Ray
* @Date 2021/03/05 15:08
* @Version 1.0
*/
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置连接mq主机
connectionFactory.setHost("192.168.176.130");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
// 通道绑定对应消息队列
// 参数1:队列名称,如果队列不存在会自动创建
// 参数2:用来定义队列特性是否要持久化
// 参数3:是否独占队列,表示只有当前连接可用该队列
// 参数4:是否在消费完成后自动删除队列
// 参数5:额外附加参数
Connection connection = connectionFactory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道绑定对应消息队列(注意参数要与生产者对应)
// 参数1:队列名称,如果队列不存在会自动创建
// 参数2:用来定义队列特性是否要持久化
// 参数3:是否独占队列,表示只有当前连接可用该队列
// 参数4:是否在消费完成后自动删除队列
// 参数5:额外附加参数
channel.queueDeclare("hello", false, false, false, null);
// 消费消息
// 参数1:消息哪个队列的消息
// 参数2:开始消息的自动确认机制
// 参数3:消费时的回调接口
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
// 参数:body 消息队里中取出的消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("输出队列中的内容>>>>>>>>>" + new String(body));
}
});
// 如果没关闭,消费者会一直等待
// 关闭,不会等结果出来(不推荐)
//channel.close();
//connection.close();
}
}
package utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Description: 工具类
* @Author Ray
* @Date 2021/03/05 16:20
* @Version 1.0
*/
public class RabbitMQUtils {
private static ConnectionFactory connectionFactory;
// 类加载时候只执行一次
static {
connectionFactory = new ConnectionFactory();
}
// 定义提供连接对象的方法
public static Connection getConnection() {
try {
// 设置连接mq主机
connectionFactory.setHost("192.168.176.130");
// 设置端口号
connectionFactory.setPort(5672);
// 设置连接哪个虚拟主机
connectionFactory.setVirtualHost("/ems");
// 设置访问虚拟主机的用户名和密码
connectionFactory.setUsername("ems");
connectionFactory.setPassword("ems");
// 通道绑定对应消息队列
return connectionFactory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
// 关闭通道和关闭连接的方法
public static void closeChanelAndConnection(Channel channel, Connection connection) {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
Work queues
,也被称为(Task queues
),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
package workqueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: 生产者
* @Author Ray
* @Date 2021/03/06 9:07
* @Version 1.0
*/
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接
Connection connection = RabbitMQUtils.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 通过通道声明队列
channel.queueDeclare("work", true, false, false, null);
for (int i = 1; i <= 20; i++) {
// 生产消息
String message = "hello work queue-" + i;
channel.basicPublish("", "work", null, message.getBytes());
}
// 关闭
RabbitMQUtils.closeChanelAndConnection(channel, connection);
}
}
消费者1:工作效率比较高
package workqueue;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: 消费者1
* @Author Ray
* @Date 2021/03/06 9:13
* @Version 1.0
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
// 获取连接
Connection connection = RabbitMQUtils.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 通过通道声明队列
channel.queueDeclare("work", true, false, false, null);
// 消费消息
// 参数1:队列名称
// 参数2:消息自动确认,true表示消费者自动向MQ确认消息消费
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 --- " + new String(body));
}
});
}
}
消费者2:工作效率比较低
package workqueue;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: 消费者1
* @Author Ray
* @Date 2021/03/06 9:13
* @Version 1.0
*/
public class Consumer2 {
public static void main(String[] args) throws IOException {
// 获取连接
Connection connection = RabbitMQUtils.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 通过通道声明队列
channel.queueDeclare("work", true, false, false, null);
// 消费消息
// 参数1:队列名称
// 参数2:消息自动确认,true表示消费者自动向MQ确认消息消费
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2 --- " + new String(body));
}
});
}
}
可以看到,消费者1瞬间完成任务,但是消费者2每2秒才完成一个任务,但是任务是平均分配的,不满足我们日常使用,应该要能者多劳。
完成一项任务可能需要几秒钟。 您可能想知道如果 一位消费者开始了一项漫长的任务,而死于其中的一部分。使用我们当前的代码,RabbitMQ一旦向消费者传递了一条消息 立即将其标记为删除。 在这种情况下,如果您杀死工人 我们将丢失正在处理的消息。我们也会失去所有 发送给该特定工作人员但未发送的消息 尚未处理。 但是我们不想丢失任何任务。 如果一个工人死了,我们希望 任务要交付给另一名工人。 为了确保消息永不丢失,RabbitMQ支持 消息确认 。 确认由主机发回。消费者告诉RabbitMQ已经收到了特定的消息,处理后,RabbitMQ可以自由删除它。
改造消费者1、2
// 每一次只能消费一个消息
channel.basicQos(1);
channel.basicConsume("work", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 --- " + new String(body));
// 手动确认 参数1:手动确认消息标识 参数2:false每次确认一个
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
如果我们将其中一个手动确认注释掉,观察MQ管理器会发生什么情况呢?
// channel.basicAck(envelope.getDeliveryTag(), false);
所以通常我们都会在消费者完成消费后,手动确认消息已消费。
默认情况下,RabbitMQ会将每条消息按顺序发送给下一个使用者。 平均每个消费者将获得相同数量的消息。 这种分发消息的方式称为循环。
如果我们想要做到能者多劳,就要改变消息自动确认机制,改为手动确认消息。
Fanout 也称为广播
RabbitMQ消息传递模型中的核心思想是生产者 从不直接将任何消息发送到队列。
在广播模式下,消息发送流程是这样的:
生产者只能将消息发送到交换机 。如果交换机不存在,则会自动创建。
package fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: 生产者
* @Author Ray
* @Date 2021/03/06 11:17
* @Version 1.0
*/
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 将通道声明指定的交换机
// 参数1:交换机名称
// 参数2:交换机类型 fanout表示广播
channel.exchangeDeclare("logs", "fanout");
// 发送消息
// 参数1:交换机名称
String message = "fanout type message";
channel.basicPublish("logs", "", null, message.getBytes());
// 关闭
RabbitMQUtils.closeChanelAndConnection(channel, connection);
}
}
注意:交换机类型有几种可用: direct , topic , headers 和 fanout 。 我们将演示最后一个-fanout。
fanout交换非常简单。 您可能会从中猜到名称,它只是将收到的所有消息广播给所有知道的队列。
假设有三个相同的消费者1、2、3
package fanout;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: 消费者1
* @Author Ray
* @Date 2021/03/06 11:25
* @Version 1.0
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
// 获取通道对象
Channel channel = connection.createChannel();
// 通道绑定交换机
channel.exchangeDeclare("logs", "fanout");
// 临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和队列
// 参数1:临时队列名称
// 参数2:交换机名称
channel.queueBind(queueName, "logs", "");
// 消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 --- " + new String(body));
}
});
}
}
首先,无论何时我们连接到Rabbit,我们都需要一个全新的空队列。 为此,我们可以使用随机名称创建一个队列,或者甚至更好-让服务器为我们选择一个随机队列名称。
其次,一旦我们断开与消费者的联系,队列应该是自动删除。
在Java客户端中,当我们不向提供任何参数时 queueDeclare() 我们使用生成的名称创建一个非持久的,排他的,自动删除队列:
String queueName = channel.queueDeclare().getQueue();
此时, queueName 包含一个随机队列名称。
Routing之订阅模型-Direct(直连)
在Fanout(广播)模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
RoutingKey
(路由key)RoutingKey
RoutingKey
进行判断,只有队列的RoutingKey
与消息的RoutingKey
完全一致,才会接收到消息图解:
RoutingKey
RoutingKey
完全匹配的队列RoutingKey
为error的消息RoutingKey
为info、error、waring的消息生产者需要通过通道声明交换机并指定交换机类型,通过routingKey
决定消息发送到哪个消息队列。
package direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: 生产者
* @Author Ray
* @Date 2021/03/06 14:11
* @Version 1.0
*/
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接
Connection connection = RabbitMQUtils.getConnection();
// 获取通道
Channel channel = connection.createChannel();
String exchangeName = "logs_direct";
// 通过通道声明交换机
// 参数1:交换机名称
// 参数2:交换机类型 direct表示路由模式
channel.exchangeDeclare(exchangeName, "direct");
// 发送消息
//String routingKey = "error";
String routingKey = "info";
String message = "这是direct模型发布的基于routerKey: [" + routingKey + "] 发送的消息";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
// 关闭
RabbitMQUtils.closeChanelAndConnection(channel, connection);
}
}
消费者1:基于routerKey绑定队列和交换机,只接收error消息。
package direct;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: 消费者1
* @Author Ray
* @Date 2021/03/06 14:16
* @Version 1.0
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "logs_direct";
// 通道声明交换机以及交换机的类型
channel.exchangeDeclare(exchangeName, "direct");
// 创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 基于routerKey绑定队列和交换机
channel.queueBind(queueName, exchangeName, "error");
// 获取消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 >>>>> " + new String(body));
}
});
}
}
消费者2:基于routerKey绑定队列和交换机,接收info、error、waring消息。
package direct;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: 消费者2
* @Author Ray
* @Date 2021/03/06 14:19
* @Version 1.0
*/
public class Consumer2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "logs_direct";
// 声明交换机以及交换机类型
channel.exchangeDeclare(exchangeName, "direct");
// 创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 临时队列和交换机的绑定
channel.queueBind(queueName, exchangeName, "info");
channel.queueBind(queueName, exchangeName, "error");
channel.queueBind(queueName, exchangeName, "waring");
// 消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 >>> " + new String(body));
}
});
}
}
生产者发送两次消息,分别是info和error消息
可以看到,消费者1只关心error消息,不关心其他消息,而消费者2关心info和error消息。假设发送的是debug,则两个消费者都不会关心。
Routing之订阅模型-Topic
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符。
这种类型RoutingKey一般都是由一个或多个单词组成,多个单词之间以“.”分隔,例如:item.insert
。
# 通配符:
1. * (star) can substitute for exactly one word. 匹配一个词
2. # (hash) can substitute for zero or more words. 匹配一个或多个词
# 例如:
audit.* 只能匹配audit.irs
audit.# 可以匹配audit.irs.corporate 或者 audit.irs 等
生产者发送消息时,routerKey可以指定规则。
package topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: 生产者
* @Author Ray
* @Date 2021/03/06 14:58
* @Version 1.0
*/
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接
Connection connection = RabbitMQUtils.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明交换机以及交换机类型
channel.exchangeDeclare("topics", "topic");
// 发布消息
String routerKey = "user.save";
//String routerKey = "user.save.findAll";
String message = "这里是topic动态路由模型,routerKey: [" + routerKey + "] 发送的消息";
channel.basicPublish("topics", routerKey, null, message.getBytes());
// 关闭
RabbitMQUtils.closeChanelAndConnection(channel, connection);
}
}
消费者1:使用“*”匹配,只能匹配一个词
package topic;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: 消费者1
* @Author Ray
* @Date 2021/03/06 15:03
* @Version 1.0
*/
public class Consumer1 {
public static void main(String[] args) throws IOException {
// 获取连接
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机以及交换机类型
channel.exchangeDeclare("topics", "topic");
// 创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定队列和交换机,基于动态通配符形式
// * 只能匹配一个词
channel.queueBind(queueName, "topics", "user.*");
// 消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 >>>> " + new String(body));
}
});
}
}
消费者2:使用“#”匹配,可以匹配一个或多个词
package topic;
import com.rabbitmq.client.*;
import utils.RabbitMQUtils;
import java.io.IOException;
/**
* @Description: 消费者2
* @Author Ray
* @Date 2021/03/06 15:03
* @Version 1.0
*/
public class Consumer2 {
public static void main(String[] args) throws IOException {
// 获取连接
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机以及交换机类型
channel.exchangeDeclare("topics", "topic");
// 创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 绑定队列和交换机,基于动态通配符形式
// # 匹配一个或多个词
channel.queueBind(queueName, "topics", "user.#");
// 消费消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 >>>> " + new String(body));
}
});
}
}
可以看到,这相当于是第四种模型的增强版,通过不同的匹配规则,消费者1和消费者2接收的消息是不一样的。
<!-- 引入mq集成依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: 192.168.176.130
port: 5672
username: ems
password: ems
virtual-host: /ems
package com.ray;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Description: 生产者
* @Author Ray
* @Date 2021/03/09 10:22
* @Version 1.0
*/
@SpringBootTest(classes = RabbitmqDay2Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
// 注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// 1. hello world
@Test
public void testHelloWorld() {
rabbitTemplate.convertAndSend("hello", "hello world");
}
}
注意:如果没有消费者,生产者就没用了,看不到效果。
package com.ray.hello;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Description: 消费者
* @Author Ray
* @Date 2021/03/09 10:33
* @Version 1.0
*/
@Component // 默认Queue:持久化 非独占 不是自动删除队列
@RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "true", autoDelete = "true"))
public class HelloConsumer {
// @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理
// 具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型
@RabbitHandler
public void receive(String message) {
System.out.println("message = " + message);
}
}
package com.ray;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Description: 生产者
* @Author Ray
* @Date 2021/03/09 10:22
* @Version 1.0
*/
@SpringBootTest(classes = RabbitmqDay2Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
// 注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// 2. work queue
@Test
public void testWorkQueue() {
// 默认公平消费
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("work", "work 模型 " + i);
}
}
}
package com.ray.work;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Description: 消费者
* @Author Ray
* @Date 2021/03/09 10:48
* @Version 1.0
*/
@Component
public class WorkConsumer {
// 消费者1
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message) {
System.out.println("message1 = " + message);
}
// 消费者2
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
注意:默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要额外配置
package com.ray;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Description: 生产者
* @Author Ray
* @Date 2021/03/09 10:22
* @Version 1.0
*/
@SpringBootTest(classes = RabbitmqDay2Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
// 注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// 3. fanout 广播
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("logs", "", "Fanout 模型");
}
}
package com.ray.fanout;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Description: 消费者
* @Author Ray
* @Date 2021/03/09 11:42
* @Version 1.0
*/
@Component
public class FanoutConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "logs", type = "fanout") // 绑定交换机
)
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "logs", type = "fanout") // 绑定交换机
)
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
执行结果
message2 = Fanout 模型
message1 = Fanout 模型
package com.ray;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Description: 生产者
* @Author Ray
* @Date 2021/03/09 10:22
* @Version 1.0
*/
@SpringBootTest(classes = RabbitmqDay2Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
// 注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// 4. route 路由模式
@Test
public void testRoute() {
//rabbitTemplate.convertAndSend("directs", "info", "发送info的key的路由信息");
rabbitTemplate.convertAndSend("directs", "error", "发送error的key的路由信息");
}
}
package com.ray.route;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Description: 消费者
* @Author Ray
* @Date 2021/03/09 12:04
* @Version 1.0
*/
@Component
public class RouteConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "directs", type = "direct"), // 指定交换机名称和类型
key = {"info", "error", "warn"} // 接收对应的消息
)
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "directs", type = "direct"), // 指定交换机名称和类型
key = {"error"} // 接收对应的消息
)
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
执行结果
message2 = 发送error的key的路由信息
message1 = 发送error的key的路由信息
package com.ray;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Description: 生产者
* @Author Ray
* @Date 2021/03/09 10:22
* @Version 1.0
*/
@SpringBootTest(classes = RabbitmqDay2Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
// 注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// 5. topic 动态路由 订阅模式
@Test
public void testTopic() {
//rabbitTemplate.convertAndSend("topics", "user.save", "user.save 路由消息");
//rabbitTemplate.convertAndSend("topics", "order", "order 路由消息");
rabbitTemplate.convertAndSend("topics", "produce.save.add", "produce.save.add 路由消息");
}
}
package com.ray.topic;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Description: 消费者
* @Author Ray
* @Date 2021/03/09 12:09
* @Version 1.0
*/
@Component
public class TopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(type = "topic", name = "topics"), // 指定交换机名称和类型
key = {"user.save", "user.*", }
)
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(type = "topic", name = "topics"), // 指定交换机名称和类型
key = {"order.#", "produce.#", "user.*"}
)
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
执行结果
message2 = produce.save.add 路由消息
package com.ray;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* @Description: 生产者
* @Author Ray
* @Date 2021/03/09 10:22
* @Version 1.0
*/
@SpringBootTest(classes = RabbitmqDay2Application.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
// 注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
// 1. hello world
@Test
public void testHelloWorld() {
rabbitTemplate.convertAndSend("hello", "hello world");
}
// 2. work queue
@Test
public void testWorkQueue() {
// 默认公平消费
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("work", "work 模型 " + i);
}
}
// 3. fanout 广播
@Test
public void testFanout() {
rabbitTemplate.convertAndSend("logs", "", "Fanout 模型");
}
// 4. route 路由模式
@Test
public void testRoute() {
//rabbitTemplate.convertAndSend("directs", "info", "发送info的key的路由信息");
rabbitTemplate.convertAndSend("directs", "error", "发送error的key的路由信息");
}
// 5. topic 动态路由 订阅模式
@Test
public void testTopic() {
//rabbitTemplate.convertAndSend("topics", "user.save", "user.save 路由消息");
//rabbitTemplate.convertAndSend("topics", "order", "order 路由消息");
rabbitTemplate.convertAndSend("topics", "produce.save.add", "produce.save.add 路由消息");
}
}
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种:1. 串行的方式 2. 并行的方式
由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间。
场景:双11是购物狂欢节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口
这样做法有一个缺点:
当库存系统出现故障时,订单就会失败。订单系统和库存系统高耦合,所以要引入消息队列
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列 作用:
默认情况下:RabbitMQ代理操作所需的所有数据、状态都将跨所有节点复制。这方面的一个例外是消息队列。默认情况下,消息队列位于一个节点上,尽管它们可以从所有节点看到和访问。
核心解决问题:当集群中某一时刻master节点宕机后,可以对Quene中消息,进行备份
# 0. 集群规划
node1: 192.168.176.3 mq1 master 主节点
node2: 192.168.176.4 mq2 repl1 副本节点
node3: 192.168.176.5 mq3 repl2 副本节点
镜像队列机制就是将队列在三个节点之间设置主从关系,消息会在三个节点之间进行自动同步,且如果其中一个节点不可用,并不会导致消息丢失或服务不可用的情况,提升MQ集群的整体高可用性。