MQ全称为Message Queue,消息队列是应⽤程序和应⽤程序之间的通信⽅法。
RabbitMQ是⼀个Erlang开发的AMQP(Advanced Message Queuing Protocol )的开源实现。
ActiveMQ:基于JMS实现, ⽐较均衡, 不是最快的, 也不是最稳定的。ZeroMQ:基于C语⾔开发, ⽬前最好的队列系统。RabbitMQ:基于AMQP协议,erlang语⾔开发,稳定性好, 数据基本上不会丢失。RocketMQ:基于JMS,阿⾥巴巴产品, ⽬前已经捐献给apahce, 还在孵化器中孵化。Kafka:类似MQ的产品;分布式消息系统,⾼吞吐量, ⽬前最快的消息服务器, 不保证数据完整性。MQ是消息通信的模型;实现MQ的⼤致有两种主流⽅式:AMQP、JMS。
AMQP AMQP是⼀种协议,更准确的说是⼀种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进⾏限定,⽽是直接定义⽹络交换的数据格式。
JMS JMS即Java消息服务(JavaMessage Service)应⽤程序接⼝,是⼀个Java平台中关于⾯向消息中间件(MOM)的API,⽤于在两个应⽤程序之间,或分布式系统中发送消息,进⾏异步通信。
AMQP 与JMS 区别
JMS是定义了统⼀的接⼝,来对消息操作进⾏统⼀;AMQP是通过规定协议来统⼀数据交互的格式JMS限定了必须
使⽤Java语⾔;AMQP只是协议,不规定实现⽅式,因此是跨语⾔的。JMS规定了两种消息模式;⽽AMQP的消息
模式更加丰富.
JMS | AMQP | |
|---|---|---|
定义 | Java api | Wire-protocol |
跨语⾔ | 否 | 是 |
跨平台 | 否 | 是 |
RabbitMQ是由erlang语⾔开发,基于AMQP(Advanced Message Queue ⾼级消息队列协议)协议实现的消息队列,它是⼀种应⽤程序之间的通信⽅法,消息队列在分布式系统开发中应⽤⾮常⼴泛。
RabbitMQ官⽅地址:http://www.rabbitmq.com/
RabbitMQ提供了6种模式:Hello Word简单模式,work⼯作模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式(通配符模式),RPC远程调⽤模式(远程调⽤,不太算MQ;不作介绍)
官⽹对应模式介绍:https://www.rabbitmq.com/getstarted.html
由Exchange、Queue、RoutingKey三个才能决定⼀个从Exchange到Queue的唯⼀的线路。
简单模式、工作队列模式:
生产消息和消费消息都是针对某一队列,区别是简单模式1个消费者,工作队列模式,多个消费者,单个消息只允许消费一次。
发布订阅模式、路由模式、通配符模式(Topic):
都是通过exchange进行转发,差异点主要在于exchange转发器,其它都一致。
**发布订阅模式:**fanout交换机,直接绑定,无规则。
**路由模式:**direct的交换机,指定key,进行发送。
**通配符模式(Topic):**topic的交换机,可以匹配多个key。
15672:(if management plugin is enabled.管理界⾯ )。
15671 :management监听端⼝。
5672, 5671 :(AMQP 0-9-1 without and with TLS 消息队列协议是⼀个消息协议)。
4369 :(epmd) epmd 代表 Erlang 端⼝映射守护进程。
25672: (Erlang distribution)。
Exchange有常⻅以下3种类型:
Fanout ⼴播:将消息交给所有绑定到交换机的队列, 不处理路由键。只需要简单的将队列绑定到交换机上。fanout类型交换机转发消息是最快的。
Direct:定向 把消息交给符合指定routing key 的队列. 处理路由键。需要将⼀个队列绑定到交换机上,要求该消息与⼀个特定的路由键完全匹配。如果⼀个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为 “dog” 的消息才被转发,不会转发 dog.puppy,也不会转发 dog.guard,只会转发dog。
路由模式: direct类型的交换机。
Topic:主题(通配符) 把消息交给符合routing pattern(路由模式)的队列. 将路由键和某模式进⾏匹配。此时队列需要绑定要⼀个模式上。符号 “#” 匹配⼀个或多个词,符号“”匹配不多不少⼀个词。因此“audit.#” 能够匹配到*“audit.irs.corporate”,但是“audit.*” 只会匹配到 “audit.irs”。
主题模式(通配符模式):topic类型的交换机。
Exchange(交换机)只负责转发消息,不具备存储消息的能⼒,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。
https://download.csdn.net/download/weixin_44624117/85722977https://www.cnblogs.com/xuwenjin/p/14984910.html1 安装
rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm2 复制配置文件
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config3 修改配置文件
修改配置文件:rabbitmq.config。
把上面这一行放开(注意最后的逗号要去掉)。表示开启 guest 账户

4 启动 rabbitmq 的插件管理
rabbitmq-plugins enable rabbitmq_management5 启动rabbitmq服务
systemctl start rabbitmq-server6 访问页面
http://localhost:15672
账号密码:guest/guest7 其它命令
服务启动/停止/重启/查询状态
systemctl start rabbitmq-server #启动
systemctl restart rabbitmq-server #重启
systemctl stop rabbitmq-server #停止
systemctl status rabbitmq-server #查看状态rabbitmq 管理命令:rabbitmqctl
rabbitmqctl list_users #查看用户
rabbitmqctl list_queues #查看队列
rabbitmqctl status #查看borker状态
rabbitmqctl purget_queue 队列名称 #删除指定队列中数据
rabbitmqctl delete_queue 队列名称 #删除指定队列
rabbitmqctl help #帮助命令插件管理命令:rabbitmq-plugins
rabbitmq-plugins list #查看插件
rabbitmq-plugins enable #启动插件
rabbitmq-plugins disable #停止插件
rabbitmq-plugins help #帮助命令 RabbitMQ在安装好后,可以访问http://localhost:15672;其⾃带了guest/guest的⽤户名和密码;如果需要创建⾃定义⽤户;那么也可以登录管理界⾯后,如下操作:

像mysql拥有数据库的概念并且可以指定⽤户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于⼀个相对独⽴的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。相当于mysql的db。Virtual Name⼀般以/开头。
1. 创建Virtual Hosts

2.设置Virtual Hosts权限

创建simple_queue队列⽤于演示Hello World简单模式

pom.xml依赖<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>application.properties# IP
spring.rabbitmq.host=8.131.239.157
# 端口号
spring.rabbitmq.port=5672
# v-host(DB)
spring.rabbitmq.virtual-host=/test
spring.rabbitmq.username=lydms
spring.rabbitmq.password=lydmsimport org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitTestTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleSend() {
rabbitTemplate.convertAndSend("simple_queue", "你好, ⼩兔⼦!====");
}
}import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "simple_queue")
public class SimpleListener {
@RabbitHandler
public void testListener(String message) {
System.out.println("======接收到的消息为:======" + message);
}
}Hello World简单模式P:⽣产者: 也就是要发送消息的程序
C:消费者:消息的接受者,会⼀直等待消息到来。
queue:消息队列,图中红⾊部分。类似⼀个邮箱,可以缓存消息;⽣产者向其中投递消息,消费者从其中取出消息。

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitTestTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleSend() {
rabbitTemplate.convertAndSend("simple_queue", "你好, ⼩兔⼦!====");
}
}import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "simple_queue")
public class SimpleListener {
@RabbitHandler
public void testListener(String message) {
System.out.println("======接收到的消息为:======" + message);
}
}Work queues⼯作队列模式Work Queues与⼊⻔程序的简单模式相⽐,多了⼀个或多个消费端,多个消费端共同消费同⼀个队列中的消息。
应⽤场景:对于任务过重或任务较多情况使⽤⼯作队列可以提⾼任务处理的速度。
在⼀个队列中如果有多个消费者,那么消费者之间对于同⼀个消息的关系是竞争的关系。
创建队列和代码都是一样的,只是多了一个消费者而已。

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitTestTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleSend() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("simple_queue", "你好, ⼩兔⼦!====" + i);
}
}
}import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "simple_queue")
public class SimpleListener {
@RabbitHandler
public void testListener(String message) {
System.out.println("======接收到的消息为:======" + message);
}
}Publish/Subscribe发布与订阅模式
发布订阅模式:
1)创建队列

2)创建交换器

3)进行绑定

@Test
public void testSimpleSend() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend("fanout_exchange", "","你好, ⼩兔⼦!====" + i);
}
}和简单模式中一致
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "simple_queue")
public class SimpleListener {
@RabbitHandler
public void testListener(String message) {
System.out.println("======接收到的消息为:======" + message);
}
}Routing路由模式**路由模式特点:**队列与交换机的绑定,不能是任意绑定了,⽽是要指定⼀个RoutingKey(路由key)消息的发送⽅在向 Exchange发送消息时,也必须指定消息的RoutingKey。Exchange不再把消息交给每⼀个绑定的队列,⽽是根据消息的Routing Key进⾏判断,只有队列的Routingkey与消息的Routing key完全⼀致,才会接收到消息2.2 创建队列。

图解:
P:⽣产者:向Exchange发送消息,发送消息时,会指定⼀个routing key。
X:Exchange(交换机):接收⽣产者的消息,然后把消息递交给与routing key完全匹配的队列。
C1:消费者:其所在队列指定了需要routing key 为 error 的消息。
C2:消费者:其所在队列指定了需要routing key 为 info、error、warning 的消息。
新建队列
创建两个队列分别叫做 direct_queue_insert 和 direct_queue_update⽤户演示。

创建交换器direct_exchange , 类型为 direct , ⽤于演示路由模式。

设置绑定:将创建的交换器 direct_exchange 和 direct_queue_insert , direct_queue_update 绑定在⼀起, 路由键Routing Key分别为 insertKey 和 updateKey。

@Test
public void testDirectExchange() {
for (int i = 0; i < 100; i++) {
if (i%2==0){
rabbitTemplate.convertAndSend("direct_exchange", "insertKey","你好, ⼩兔⼦!==inserKey==" + i);
}else {
rabbitTemplate.convertAndSend("direct_exchange", "updateKey","你好, ⼩兔⼦!==updateKey==" + i);
}
}
}页面队列信息:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "direct_queue_insert")
public class SimpleListener {
@RabbitHandler
public void testListener(String message) {
System.out.println("======接收到的消息为:======" + message);
}
}收到消息:
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==52
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==54
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==56
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==58
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==60
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==62
======接收到的消息为:======你好, ⼩兔⼦!==inserKey==64Topics通配符模式(主题模式)Topic 类型与 Direct 相⽐,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使⽤通配符!
Routingkey : ⼀般都是有⼀个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配⼀个或多个词。*:匹配不多不少恰好1个词。举例:
lydms.#: 能够匹配 lydms.insert.abc 或者 lydms.insert。
lydms.:只能匹配 lydms.insert。


图解:
usa.# ,因此凡是以usa.开头的routing key都会被匹配到。#.news ,因此凡是以.news结尾的routing key都会被匹配。创建队列 topic_queue1 和 topic_queue1

创建交换器 topic_exchange , type类型为 topic。

设置绑定:
topic_queue1 绑定的Routing Key路由键为 item.*。
topic_queue2 绑定的Routing Key路由键为 item.#。

@Test
public void testTopicSend() {
rabbitTemplate.convertAndSend("topic_exchange", "lydms.select", "你好, ⼩兔⼦===lydms.select");
rabbitTemplate.convertAndSend("topic_exchange", "lydms.select.abc", "你不太好, ⼩兔⼦===lydms.select.abc");
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "topic_queue1")
public class SimpleListener {
@RabbitHandler
public void testListener(String message) {
System.out.println("======接收到的消息为:======" + message);
}
}