消息队列
MQ全称为Message Queue即消息队列
"消息队列"
是在消息的传输过程中保存消息的容器。⭐
以下信息来源于:大佬,感谢大佬!!分享
场景
MQ解耦
场景:
MQ
场景:
MQ
异步的方式从消息队列接收订单消息,再执行耗时的寻找操作
⭐
高级消息队列协议!
是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS RabbitMQ 就是基于 AMQP 协议实现的。
java 消息服务
JMS的客户端之间可以通过JMS服务
进行异步的消息传输。
JMS(JAVA Message Service,Java消息服务)API是一个消息服务的标准或者说是规范
允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。
它使分布式通信耦合度更低,消息服务更加可靠以及异步性。 ActiveMQ 就是基于 JMS 规范实现的。
有跨平台、跨语言特性。
交换机
提供的路由算法
AMQP可以提供多样化的路由方式来传递消息到消息队列 4种交换机类型,6种模式
JMS 仅支持 队列 和 主题/订阅 方式两种
消息队列
erlang
语言开发,所以安装环境需要安装 erlang
AMQP
(Advanced Message Queue 高级消息队列协议
)协议实现的消息队列
⭐
消息队列服务进程
:此进程包括两个部分:Exchange交换机
和Queue队列
交换机
:按一定的规则将消息路由转发到某个队列,对消息进行过虑。
一个有四种类型:Direct, Fanout, Topic, Headers
消息队列
:存储消息的队列。
队列中去!
⭐
RabbitMQ消息传递模型的核心思想是:
生产者永远不会将任何消息直接发送到队列,通常生产者甚至不知道消息是否会被传递到任何队列。生产者只能向交换机(Exchange)发送消息。
交换机是一个非常简单的东西。一边接收来自生产者的消息,另一边将消息推送到队列。
✔
所以需要安装对应的,运行环境!
无脑下一步即可!
ERLANG_HOME=erlang安装目录
注意别在中文目录下!
path中添加: %ERLANG_HOME%\bin
下一步...
之后
start 启动服务即可…
就可以通过浏览页面,方便的管理Rabbit了
安装目录下: 管理员身份运行 rabbitmq-plugins.bat enable rabbitmq_management
http://localhost:15672
用户/密码 默认: guest
如果开始菜单中没有,服务则进入安装目录下sbin目录手动启动:
rabbitmq-service.bat install 安装服务
bbitmq-service.bat start 启动服务
如上
RabbitMQ 服务启动不…
Windows账号名是中文的!!!
如果你们也是这个问题,也可以试试我这个方法!
rabbitmq-service.bat remove #删除服务
set RABBITMQ_BASE=D:\WSMwork\RabbitMQ\data #设置:目录, 自行设置
#重新生产服务,安装管理插件!
rabbitmq-service.bat install
rabbitmq-plugins enable rabbitmq_management
启动服务不同的人有,不同的写法不过大致相同: 这两天看了好几篇博客好几种写法...
发送者Producter
一个消息接收者Consumer
两者,也大致相同: 创建链接…创建通道…绑定. 读/写
对于这种链接操作可以,创建一个工具类:util处理pom.xml
<!-- rabbitmq依赖! -->
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
</dependencies>
ConnectionUtil.Java
//注意MQ 的连接对象类型...
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//RabbitMQ 连接配置类:
public class ConnectionUtil {
//静态方法,需要连接对象直接调用即可!
public static Connection getConnection() throws IOException, TimeoutException {
//获取一个链接工程
ConnectionFactory factory = new ConnectionFactory();
//设置属性
factory.setHost("127.0.0.1"); //ip
factory.setPort(5672); //端口
factory.setVirtualHost("/"); //虚拟主机地址,虚拟机相当于一个独立的mq服务器, /默认存在!
factory.setUsername("guest"); //用户密码
factory.setPassword("guest");
Connection connection = factory.newConnection();
return connection;
}
}
虚拟主机 就是一个存储位置..
查看交换机…
如图,显而易见,非常简单就是一个一发一读
的过程…
不需要交换机!
HellowProducter.Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.wsm.util.ConnectionUtil;
//发送者
public class HellowProducter {
//发送消息
public void send() throws Exception {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道!Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
Channel channel = connection.createChannel();
/**设置消息队列的属性!
* queue :队列名称
* durable :是否持久化 如果持久化,mq重启后队列数据还在! (队列是在虚拟路径上的...)
* exclusive :队列是否独占此连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* autoDelete :队列不再使用时是否自动删除此队列,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* arguments :队列参数 null,可以设置一个队列的扩展参数,需要时候使用!比如:可设置存活时间
* */
channel.queueDeclare("hello.queue", true, false, false, null);
//要发送的消息
String mess = "一条消息:" + new SimpleDateFormat("yyyy-MM-dd hh:mm-ss").format(new Date());
/**发送消息,参数:
* exchange :指定的交换机,不指定就会有默认的....
* routingKey :路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机routingKey设置为队列的名称
* props :消息包含的属性: 后面介绍,可以是一个一个对象...
* body :发送的消息,AMQP以字节方式传输...
* */
channel.basicPublish("", "hello.queue", null, mess.getBytes());
//关闭通道和连接(资源关闭最好用try-catch-finally语句处理
channel.close();
connection.close();
}
//main 启动运行...
public static void main(String[] args) throws Exception {
HellowProducter sp = new HellowProducter();
sp.send();
}
}
HelloConsumer.Java
import com.rabbitmq.client.*;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
//消息接收者:
public class HelloConsumer {
public void revice() throws Exception {
//创建连接:
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello.queue", true, false, false, null);
//收到消息后用来处理消息的回调对象
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
/**
* 当接收到消息后此方法将被调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope: 可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//将返回的字节数组转换成 String 打印输出!
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
// 监听队列,第二个参数:是否自动进行消息确认。
//参数:String queue, boolean autoAck, Consumer callback
/**
* 参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复验证,这就是Unacked 为返回ack的数据
* 3、callback,消费方法,当消费者接收到消息要执行的方法
*/
channel.basicConsume("hello.queue", true, defaultConsumer);
}
//main运行输出!
public static void main(String[] args)throws Exception {
HelloConsumer sc =new HelloConsumer();
sc.revice();
}
}
发送者
接收者:
升级版!
多个消费者,对应一个发送者,发送者 产生的消息存在队列种,队列会以复杂均衡形式
轮询的发送给多个消费者美团外卖:用户下单——后台内部要联系商家 骑手 生产订单 处理...
HellowProducter.Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wsm.util.ConnectionUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
//发送者
public class WorkProducter {
//发送消息
public void send() throws Exception {
//创建连接
Connection connection = ConnectionUtil.getConnection();
//创建通道!Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
Channel channel = connection.createChannel();
/**设置消息队列的属性!
* queue :队列名称
* durable :是否持久化 如果持久化,mq重启后队列数据还在! (队列是在虚拟路径上的...)
* exclusive :队列是否独占此连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* autoDelete :队列不再使用时是否自动删除此队列,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* arguments :队列参数 null,可以设置一个队列的扩展参数,需要时候使用!比如:可设置存活时间
* */
channel.queueDeclare("work.queue", true, false, false, null);
//要发送的消息
String mess = "一条消息:" + new SimpleDateFormat("yyyy-MM-dd hh:mm-ss").format(new Date());
/**发送消息,参数:
* exchange :指定的交换机,不指定就会有默认的....
* routingKey :路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机routingKey设置为队列的名称
* props :消息包含的属性: 后面介绍,可以是一个一个对象...
* body :发送的消息,AMQP以字节方式传输...
* */
channel.basicPublish("", "work.queue", null, mess.getBytes());
//关闭通道和连接(资源关闭最好用try-catch-finally语句处理
channel.close();
connection.close();
}
//main 启动运行...
public static void main(String[] args) throws Exception {
WorkProducter sp = new WorkProducter();
sp.send();
}
}
两个接收者代码一模一样!!
WorkConsumer2.Java
import com.rabbitmq.client.*;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
//消息接收者:
public class WorkConsumer2 {
public void revice() throws Exception {
//创建连接:
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work.queue", true, false, false, null);
//收到消息后用来处理消息的回调对象
//basicConsume方法监听到数据后就会执行这个 handleDelivery 回调方法,进行数据展示处理!
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
/**
* 当接收到消息后此方法将被调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//将返回的字节数组转换成 String 打印输出!
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
// 监听队列,第二个参数:是否自动进行消息确认。
//参数:String queue, boolean autoAck, Consumer callback
/**
* 参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复验证,这就是Unacked 为返回ack的数据
* 3、callback,消费方法,当消费者接收到消息要执行的方法
*/
channel.basicConsume("work.queue", true, defaultConsumer);
}
//main运行输出!
public static void main(String[] args)throws Exception {
WorkConsumer2 sc =new WorkConsumer2();
sc.revice();
}
}
交换机类型:Fanout
发送者消息!
fanout类型
Fanout也称为 广播模式
PublishProducter.Java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wsm.util.ConnectionUtil;
import java.text.SimpleDateFormat;
import java.util.Date;
public class PublishProducter {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
/**声明交换机
* 交换机名
* 交换机类型 FANOUT
* */
channel.exchangeDeclare("publish.exchange", BuiltinExchangeType.FANOUT);
/**声明队列/
channel.queueDeclare("email.queue", true, false, false, null);
/**交换机绑定队列! 发送者可以省略,发送者只要知道往那个交换机上发送即可!设置不需要知道队列!!
*指定的队列,与指定的交换机关联起来
*指定交换机
*第三个参数时 routingKey, 由于是fanout交换机, 这里忽略 routingKey; 但它是必须参数所以必须要加...而且无论加不加都不会产生影响!
*/
// channel.queueDeclare("email.queue", true, false, false, null);
// channel.queueDeclare("sms.queue", true, false, false, null);
//对于 fanout 类型的交换机,routingKey会被忽略,但不允许null值,允许 ""
// channel.queueBind("email.queue", "publish.exchange", "");
// channel.queueBind("sms.queue", "publish.exchange", "");
//发送的消息
String mess = "email和sms共同的一条消息:" + new SimpleDateFormat("yyyy-MM-dd hh:mm-ss").format(new Date());
//发送消息: 到交换机上,交换机根据 FANOUT类型给,每一个队列发送消息...
channel.basicPublish("publish.exchange", "", null, mess.getBytes());
connection.close();
}
}
Email EmailConsumer.Java
import com.rabbitmq.client.*;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class EmailConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("publish.exchange", BuiltinExchangeType.FANOUT);
//绑定交换机队列
//自动生成对列名, 非持久,独占,自动删除
// String queuename = channel.queueDeclare().getQueue();
//绑定!
// channel.queueBind(queuename, "publish.exchange", "");
//或指定队列绑定!当然在这之前要存在声明!
channel.queueBind("email.queue", "publish.exchange", "");
//消息回调
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//设置...
channel.basicConsume("email.queue",true,defaultConsumer);
}
}
SMS SMSConsumer.Java
import com.rabbitmq.client.*;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SMSConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("publish.exchange", BuiltinExchangeType.FANOUT);
channel.queueBind("sms.queue", "publish.exchange", "");
//消息回调
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//设置...
channel.basicConsume("sms.queue",true,defaultConsumer);
}
}
生产者永远不会将任何消息直接发送到队列,通常生产者甚至不知道消息是否会被传递到任何队列。
生产者只能向交换机(Exchange)发送消息。 所以生产者,可以不用指定/声明队列!
只需要在,消费者声明 绑定数据队列即可! 当然也可以使用程序:
生成对列名绑定;
自动生成对列名, 非持久,独占,自动删除 String queuename = channel.queueDeclare().getQueue();
绑定 channel.queueBind(queuename, "publish.exchange", "");
publish/subscribe
与work queues
有什么区别区别:
相同点:
发布订阅模式可以指定自己专用的交换机
交换机类型:DIRECT
交换机/队列
时候,需要指定 routing key 一个队列,可以设置多个 routingkey;
routing key
来匹配, 确定消息发送到那个队列上!
direct
向 direct 类型,交换机,发送数据必须携带,Routing key
RoutingProducter.Java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class RoutingProducter {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//指定交换机 及 交换机的类型
channel.exchangeDeclare("rout.exchange", BuiltinExchangeType.DIRECT);
// 定义队列.....发送者可以不需要!
// channel.queueDeclare("rout.queue", true, false, false, null);
//要发送的信息!
String mess = "一条消息:" + new SimpleDateFormat("yyyy-MM-dd hh:mm-ss").format(new Date());
//开始发送... 发送的交换机 发送的key队列 发送的特殊类型数据 发送的方式字节数组!
//发送数据时候指定 routingkey 取数据时候,根据指定的routingkey 而获得对应的routingkey 匹配的值!
channel.basicPublish("rout.exchange", "email.key", null, (mess+"email").getBytes()); //emial.key
channel.basicPublish("rout.exchange", "sms.key", null, (mess+"sms").getBytes()); //sma.key
//关闭资源!
connection.close();
}
}
EmailkeyConsumer.Java
import com.rabbitmq.client.*;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class EmailkeyConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//连接
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("rout.exchange", BuiltinExchangeType.DIRECT);
//使用:自动生成的队列名;
String queueName = channel.queueDeclare().getQueue();
//读取数据时候,只要根据绑定 routing key 交换机就会往对应队列上发送数据;
channel.queueBind(queueName, "rout.exchange", "email.key");
//一个队列可以绑定多个Routingkey, 从而实现多种类型值...
// channel.queueBind(queueName, "rout.exchange", "sms.key"); //这里可以松解注释多次进行测试...
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//使用默认 指定要监测的队列...
channel.basicConsume(queueName,true,defaultConsumer);
}
}
String queueName = channel.queueDeclare().getQueue();
获取到一个临时队列名称。
channel.queueDeclare():创建一个非持久化、独立、自动删除的队列名称
此队列是临时的,随机的,一旦我们断开消费者,队列会立即被删除
随机队列名,如amq.gen-jzty20brgko-hjmujj0wlg只有一个 email.key 没有松开注释的
松开注释的 email.key sms.key
nice, 这里我尽然搞了半小时才搞明白!踩了几个坑难受!
通过Routing key路由key
来实现!在交换机队列 绑定时候,指定 Routingkey 可以设置多个
发送者发送数据, 根据 routingkey 指定发送消息的类型
接收者,通过 绑定交换机 + 队列 + routing key
来匹配, 确定消息发送到那个队列上!
官方场景:
控制台
日志记录文件
它们所需要写入的数据是不一样的!,却来自一个发送者!!注意:
发送者 发送消息时候需指定 Routingkey 或 队列名
队列可以有多种别名 Routingkey
表示消息的不同类型
交换机 + 队列 + Routingkey 进行绑定
就可以根据队列名,来获取需要类型的数据集合了!
接收者 监听方法,指定监听的队列 channel.basicConsume(queueName,true,defaultConsumer);
交换机类型:TOPIC
该模式与Routingkey 非常类型,就相当于是一个 动态路由模式!!
Routingkey
来转发消息到指定的队列。
#:匹配一个或多个词
举例:wsm.# 等于:wsm.1 / wsm.w.s.m / wsm.sm .后多个单词
*:匹配不多不少恰好1个词
举例:wsm.* 等于:wsm.sm / wsm.m .后一个单词
修改上面的Routingkey 即可:
TopicsProducter.Java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class TopicsProducter {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//指定交换机 及 交换机的类型
channel.exchangeDeclare("tpc.exchange", BuiltinExchangeType.TOPIC);
//要发送的信息!
String mess = "一条消息:" + new SimpleDateFormat("yyyy-MM-dd hh:mm-ss").format(new Date());
channel.basicPublish("tpc.exchange", "wsm.email.key", null, (mess+"email").getBytes()); //emial.key
channel.basicPublish("tpc.exchange", "wsm.sms.key", null, (mess+"sms").getBytes()); //sma.key
//关闭资源!
connection.close();
}
}
TopicsConsumer.Java
import com.rabbitmq.client.*;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicsConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//连接
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("tpc.exchange", BuiltinExchangeType.TOPIC); //指定交换机类型TOPIC
//使用:自动生成的队列名;
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "tpc.exchange", "wsm.#"); //通配符Routingkey wsm.# 所有的key!
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String str = new String(body, "UTF-8");
System.out.println(str);
}
};
//使用默认 指定要监测的队列...
channel.basicConsume(queueName,true,defaultConsumer);
}
}
TOPIC
Routingkey 设置为:wsm.#
匹配所有 wsm.开头的Routing key… 因此 wsm.email.key wsm.sms.key 都将匹配!headers类型的交换器的性能很差,不建议使用。
发送消息时候,提供:发送消息 和 要匹配的map
RabbitMQ会获取到该消息的headers,
对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对。 如果匹配,则该消息到此队列中
HederProducter.Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wsm.util.ConnectionUtil;
import java.util.HashMap;
import java.util.Map;
//生产者
public class HederProducter {
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("head.exchange", BuiltinExchangeType.HEADERS);
//发送的内容
String str ="header的内容lalala~~";
//发送的head map, 需要与队列的map 规范匹配才可以发送成功! 不然发送失败!
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","2");
param.put("name","wsm");
//设置Map 匹配参数!
AMQP.BasicProperties.Builder builder=new AMQP.BasicProperties.Builder();
builder.headers(param);
//发送参数 两次!
channel.basicPublish("head.exchange","",builder.build(),str.getBytes());
//关闭资源!
connection.close();
}
}
HeaderConsumer.Java
import com.rabbitmq.client.*;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
//消费者
public class HeaderConsumer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("head.exchange", BuiltinExchangeType.HEADERS); //交换机类型 heades
//设置队列上的 map 参数,用于匹配请求时候的参数!
//特殊参数 x-match 值 all 或 any
//all 在发布消息时携带的map 必须和绑定在队列上的所有map 完全匹配
//any 只要在发布消息时携带的有一对键值map 满足队列定义的多个参数map的其中一个就能匹配上
//注意: 这里是键值对的完全匹配,只匹配到键了,值却不一样是不行的;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1");
param.put("name","wsm");
//声明队列 传入
channel.queueDeclare("headqueue", false, false, false, null);
//绑定
// 队列绑定时需要指定参数,注意虽然不需要路由键但仍旧不能写成null,需要写成空字符串""
channel.queueBind("headqueue", "head.exchange", "",param); //map参数,规范!
//----------------------------------------以上是声明 交换机/队列 绑定规范...
//以下是,消费者对消息的监听,获取!----------------------------------------....
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Map<String, Object> headers = properties.getHeaders();
String str = new String(body, "UTF-8");
System.out.println("内容:"+str);
System.out.println("Map传入参数数据!"+headers);
}
};
channel.basicConsume("headqueue", true, defaultConsumer);
}
}
运行消费者 —— 生产者… 并尝试改变参查看效果..
//消费者:队列规则all 所有匹配即可
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1");
param.put("name","wsm");
//生产者:传入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","2");
param.put("name","wsm");
//不匹配
//生产者
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","all");
param.put("id","1");
param.put("name","wsm");
//匹配
//消费者:队列规则any 一个匹配即可
Map<String ,Object> param = new HashMap<String, Object>();
param.put("x-match","any");
param.put("id","1");
param.put("name","wsm");
//生产者:传入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","1"); //匹配
//生产者:传入head map;
Map<String ,Object> param = new HashMap<String, Object>();
param.put("id","2"); //不匹配 key /value 都要匹配才可以
....
channel.confirmSelect()
channel.addConfirmListener(ConfirmListener listener);
, 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!ConfimProducer.Java
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.zb.util.ConnectionUtil;
import java.io.IOException;
public class ConfimProducer {
public static void main(String[] args) throws Exception {
//连接
Connection conn = ConnectionUtil.getConnection();
Channel channel = conn.createChannel();
//声明交换机 类型无所谓...
channel.exchangeDeclare("confim.exchage", BuiltinExchangeType.TOPIC);
//开启确认机制;
channel.confirmSelect();
//发送消息
channel.basicPublish("confim.exchage", "confim.key", null, "一条消息".getBytes());
//开启 confrim 消息认证机制...
channel.addConfirmListener(new ConfirmListener() {
/**发送成功!执行方法..
* long: 返回消息的,序列号
* boolean: 是否允许消息的批量发送!
*/
public void handleAck(long l, boolean b) throws IOException {
System.out.println(l);
System.out.println(b);
System.out.println("ack"); //已经发送ack确认
}
//发送失败!执行... 一般发送这种事情的处理方案...
public void handleNack(long l, boolean b) throws IOException {
System.out.println(l);
System.out.println(b);
System.out.println("nack"); //nack没有确认!
}
});
}
}
ConfimConsumer.Java
import com.rabbitmq.client.*;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
public class ConfimConsumer {
public static void main(String[] args) throws Exception {
//连接
Connection conn = ConnectionUtil.getConnection();
Channel channel = conn.createChannel();
//声明交换机
channel.exchangeDeclare("confim.exchage", BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare("confim.queue", true, false, false, null);
//绑定数据
channel.queueBind("confim.queue", "confim.exchage", "confim.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
};
//监听消息, 传入回调函数...
//开启ack 自动认证...
channel.basicConsume("confim.queue", true, consumer);
}
}
没办法模拟成功!案例
ReturnProducter.Java
import com.rabbitmq.client.*;
import com.zb.util.ConnectionUtil;
import java.io.IOException;
public class ReturnProducter {
public static void main(String[] args)throws Exception {
//连接!
Connection conn = ConnectionUtil.getConnection();
Channel channel = conn.createChannel();
String exchangeName = "return.exchage"; //设置交换机
String routingKey = "return.key"; //不存在的Routingkey: return1.key
//设置Return Listener监听.... 当然发送消息,交换机/队列错误..没有发送成功就会尽然这个方法!
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
System.out.println("replyCode:"+replyCode);
System.out.println("replyText:"+replyText);
System.out.println("交换机exchange:"+exchange);
System.out.println("交换机exchange数据:"+new String(bytes,"UTF-8"));
}
});
//发送一条数据,
channel.basicPublish(exchangeName, routingKey,true, null, "return一条消息".getBytes());
/**basicPublish 第三个参数...
* true: 时,交换器无法根据自动类型TOPIC 根据路由键找到一个符合条件的队列,那么RabbitMq会调用Basic.Ruturn命令将消息返回给生产者: Return Listener监听
* false:时,出现上述情况消息被直接丢弃
*/
}
}
ReturnConsumer.Java
import com.rabbitmq.client.*;
import com.zb.util.ConnectionUtil;
import java.io.IOException;
public class ReturnConsumer {
public static void main(String[] args) throws Exception {
//连接
Connection conn = ConnectionUtil.getConnection();
Channel channel = conn.createChannel();
//声明: 交换机 队列 绑定!
channel.exchangeDeclare("return.exchage", BuiltinExchangeType.TOPIC);
channel.queueDeclare( "return.queue", true, false, false, null);
channel.queueBind( "return.queue", "return.exchage", "return.#");
//接收数据 回调!
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
};
channel.basicConsume( "return.queue", true, consumer);
}
}
运行 接收者 —— 发送者… 并手动更改 发送者的 Routingkey 值, 使数据发送不成功!
channel.basicPublish(exchangeName, routingKey,false, null, "return一条消息".getBytes());
限流是限制的消费端的数据流动!
QosProducter.Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wsm.util.ConnectionUtil;
public class QosProducter {
public static void main(String[] args) throws Exception {
Connection conn = ConnectionUtil.getConnection();
Channel channel = conn.createChannel();
String exchangeName = "qos.exchage";
String routingKey = "qos.key";
//循环发送五条数据!
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchangeName, routingKey, true, null, ("qos一条消息" + i).getBytes());
}
}
}
QosConsumer.Java
public class QosConsumer {
public static void main(String[] args) throws Exception {
Connection conn = ConnectionUtil.getConnection();
final Channel channel = conn.createChannel();
//创建 交换机 队列 并进行绑定!
channel.exchangeDeclare("qos.exchage", BuiltinExchangeType.TOPIC);
channel.queueDeclare("qos.queue", true, false, false, null);
channel.queueBind("qos.queue", "qos.exchage", "qos.#");
/**限流 Void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
*prefetchSize:0 不限制消息大小
*prefetchSize 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该Consumer将block(阻塞)掉,直到有消息ack
*Global:true\false是否将上面设置应用于Channel;简单来说,就是上面限制是Channel级别的还是Consumer级别
*/
channel.basicQos(0, 3, false); //每次通过三条数据!
//发送消息,没有返回ack 的回调方法!
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("===========服务端发送的信息===============");
System.out.println(new String(body, "UTF-8"));
//代码手动回复ack,告诉服务端,以处理完当前的消息,你可以继续发下一条消息了!
//测试程序演示,注释掉改代码,不然它每一次都通过代码手动回复了...就看不到效果了...
// channel.basicAck(envelope.getDeliveryTag(),false); // false 处理批量数据
}
};
//发送数据,因为要查看限流的效果,默认的自动回复设置为 false 不自动回复!
// channel.basicConsume("qos.queue", true, consumer);
channel.basicConsume("qos.queue", false, consumer);
}
}
注意:
一般都是记录下来,人工进行整改!
MQ服务
关闭重回队列
,也就是设置为False;因为重回队列消息有很大概率依然会处理失败
所以只要知道这个失败的消息,会重新回到队列就OK了…这是Rabbit MQ的一个特点吧…AckProducter.Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wsm.util.ConnectionUtil;
import java.util.HashMap;
import java.util.Map;
public class AckProducter {
public static void main(String[] args) throws Exception {
//连接
Connection conn = ConnectionUtil.getConnection();
Channel channel = conn.createChannel();
//循环发送5条消息! 交换机 队列 都在消费者,那边创建好了!
for (int i = 0; i < 5; i++) {
Map<String, Object> header = new HashMap<String, Object>();
header.put("num",i);
//设置发送规格!虽然交换机不是hede 模式,不会进行规格验证!但照样而可以发送规格来,进行处理一些特殊方式!
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.deliveryMode(2) //设置消息是否持久化,1:非持久化 2:持久化
.contentEncoding("UTF-8")
.headers(header).build();
channel.basicPublish( "ack.exchage" ,"ack.key",true, properties, ("ack一条消息" + i).getBytes());
/**basicPublish 第三个参数...
* true: 时,交换器无法根据自动类型TOPIC 根据路由键找到一个符合条件的队列,那么RabbitMq会调用Basic.Ruturn命令将消息返回给生产者: Return Listener监听
* false:时,出现上述情况消息被直接丢弃
*/
}
channel.close();
conn.close();
}
}
AckConsumer.Java
import com.rabbitmq.client.*;
import com.zb.util.ConnectionUtil;
import java.io.IOException;
import java.util.Map;
public class AckConsumer {
public static void main(String[] args) throws Exception {
//连接
Connection conn = ConnectionUtil.getConnection();
final Channel channel = conn.createChannel();
//声明 交换机 队列 并进行绑定!
channel.exchangeDeclare("ack.exchage", BuiltinExchangeType.TOPIC); //交换机类型是 TOPIC
channel.queueDeclare("ack.queue", true, false, false, null);
channel.queueBind("ack.queue", "ack.exchage", "ack.#");
//对队列中消息处理的回调方法...
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
System.out.println("===========处理队列中消息!===========");
//为了方便看到效果,主线程休眠 2秒!
Thread.sleep(2000);
//获取到发送消息的 Map规格.... 因为交换机类型不是 HEADERS 所以Map规格不匹配照样发送成功!
Map<String, Object> headers = properties.getHeaders();
System.out.println(new String(body, "UTF-8"));
System.out.println("=========" + headers);
//模拟消息发送失败!头消息不为null 且 编号1的元素...
if (headers!=null && headers.get("num").toString().equals("1")) {
/**消息接收失败重回队列!不返回ack !
* void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
* deliveryTag :该消息的index
* multiple :是否批量. true:将一次性拒绝所有小于deliveryTag的消息。
* requeue :被拒绝的是否重新入队列, true将消息继续放到队列的末尾在此发送,false 不在放到队列中直接销毁!
* 本次设置true不拒绝,本次测试就是要重回队列,查看效果!
*/
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
//消息接收成功不重回队列,返回 ack... 消息接收成功!
channel.basicAck(envelope.getDeliveryTag(), false);
}
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("异常=================");
}
}
};
//监听队列中的消息..
channel.basicConsume("ack.queue", false, consumer);
}
}
生存时间
主要针对消息设置,跟交换机、队列、消费者设置毫无关系
TTLProducter.Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wsm.util.ConnectionUtil;
import java.util.HashMap;
import java.util.Map;
public class TTLProducter {
public static void main(String[] args) throws Exception {
//连接
Connection conn = ConnectionUtil.getConnection();
Channel channel = conn.createChannel();
//通过channel发送消息
String msg = "hello rabbitmq!";
AMQP.BasicProperties properties = new AMQP.BasicProperties();
//设置规格!
Map<String,Object> headers = new HashMap<String, Object>();
headers.put("name", "wsm");
properties = properties.builder()
// 设置编码为UTF8
.contentEncoding("UTF-8")
// 设置自定义Header
.headers(headers)
// 设置消息失效时间
.expiration("5000").build();
//发送两条消息...
//设置了消息超时时间为5秒, 5秒后消息自动删除
channel.basicPublish("", "ttl_queue", properties, msg.getBytes());
//没有设置消息存活时间,消息存活时间根据队列来决定
channel.basicPublish("", "ttl_queue", null, msg.getBytes());
//关闭连接
channel.close();
conn.close();
}
}
TTLConsumer.Java
import com.rabbitmq.client.*;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class TTLConsumer {
public static void main(String[] args) throws Exception {
//连接
Connection conn = ConnectionUtil.getConnection();
Channel channel = conn.createChannel();
//设置规格...
Map<String, Object> arguments = new HashMap<>();
// 设置队列超时时间为10秒
arguments.put("x-message-ttl", 10000); //固定的key属性规格!x-message-ttl 设置队列消息存储时间!
//声明(创建)一个队列... 这里没有绑定/指定交换机!
String queueName = "ttl_queue";
//绑定队列!
channel.queueDeclare(queueName,true, false, false, arguments);
//回调数据展示!
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//获取消息
System.out.println(new String(body, "UTF-8"));
//获取head
Map<String, Object> headers = properties.getHeaders();
System.out.println(headers);
}
};
//监听队列中的消息..
channel.basicConsume("ttl_queue", false, consumer);
}
}
创建出队列!
之后关闭即可!展示信息,不容易观看,直接在页面展示!第一条消息过期
—— 10秒后队列中消息过期
死信队列
交换机上!
,这个Exchange就是DLX
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性;
DlxProducter.Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wsm.util.ConnectionUtil;
public class DlxProducter {
public static void main(String[] args) throws Exception {
//连接
Connection conn = ConnectionUtil.getConnection();
Channel channel = conn.createChannel();
//发送信息
//设置规格map
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder()
.deliveryMode(2) //设置消息是否持久化,1: 非持久化 2:持久化
.contentEncoding("UTF-8")
.expiration("5000") //设置消息过期时间... 死信产生时间!
.build();
//发送消息
channel.basicPublish( "dlx.exchage", "dlx.key", properties, ("一条消息!!").getBytes());
channel.close();
conn.close();
}
}
DlxConsumer.Java
import com.rabbitmq.client.*;
import com.wsm.util.ConnectionUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DlxConsumer {
public static void main(String[] args) throws Exception {
//连接
Connection conn = ConnectionUtil.getConnection();
final Channel channel = conn.createChannel();
//设置一个普通交换机 队列 并绑定...
channel.exchangeDeclare("dlx.exchage", BuiltinExchangeType.TOPIC);
//设置绑定死信交换机参数... 如果: 交换机中出现死信,信息直接销毁并存放到死信交换机中! sx.exchange
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "sx.exchange");
//普通队列
channel.queueDeclare("dlx.queue", true, false, false, agruments);
//绑定
channel.queueBind("dlx.queue", "dlx.exchage", "dlx.#");
//定义死信交换机 TOPIC类型...
//Routingkey # 匹配所有的Routingkey 的值...
channel.exchangeDeclare("sx.exchange", BuiltinExchangeType.TOPIC);
channel.queueDeclare("sx.queue", true, false, false, null);
channel.queueBind("sx.queue", "sx.exchange", "#");
//监听到值后回调获取信息,进行处理!
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
};
//监听死信队列上的值...
channel.basicConsume("dlx.queue", true, consumer);
}
}
死信
了,就换个交换机存储!
使当,产生死信时候进行处理的一个(死信)交换机
//死信交换机 TOPIC类型…
//Routingkey # 匹配所有的Routingkey 的值…
channel.exchangeDeclare(“sx.exchange”, BuiltinExchangeType.TOPIC);
channel.queueDeclare(“sx.queue”, true, false, false, null);
channel.queueBind(“sx.queue”, “sx.exchange”, “#”);
定义交换机...
关闭/开启都运行一次!
正常工作
因为, 程序开启,未到过期时间… 消息不死正常输出!
不工作
5秒后死信队列上出现数据!
延时队列
延迟队列
消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。
订单超时
并设置了过期时间
但并不会消费…终于要写完了!!
重新开启一个项目工程!
依赖
:pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- 父依赖! -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.zb</groupId>
<artifactId>bootMQ</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- amqp依赖! -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- web依赖! -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>
application.yml
spring:
rabbitmq:
host: 127.0.0.1 #ip
port: 5672 #端口
username: guest #用户
password: guest #密码
virtual-host: / #虚拟路径!
#有关AmqpTemplate的配置
# template:
# retry:
# enabled: true
# initial-interval: 10000ms
# max-interval: 300000ms
# multiplier: 2
# exchange: topic.exchange
# publisher-confirms: true
template:有关AmqpTemplate的配置
当然如果consumer只是接收消息而不发送,就不用配置template相关内容。
MQConfig.Java
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration //Boot配置类注解;
public class MQConfig {
//队列
public static final String QUEUE_EMAIL = "boot.email";
//交换机
public static final String EXCHANGE_TOPIC_INFORM = "exchange.topic.boot";
//创建交换机,并交给Spring管理
@Bean(EXCHANGE_TOPIC_INFORM)
public Exchange createExchange() {
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_INFORM).durable(true).build();
}
//创建队列交给Spring管理
/*
* new Queue(QUEUE_EMAIL,true,false,false)
* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
@Bean(QUEUE_EMAIL)
public Queue createEmailQueue() {
Queue queue = new Queue(QUEUE_EMAIL);
return queue;
}
/**绑定交换机/队列 并指定Rontingkey!
*
* @param exchange 指定交换机 @Qualifier注解根据参数名进行匹配Spring映射!
* @param queue 指定队列!
* @return
*/
@Bean
public Binding createEmailToExchange(@Qualifier(EXCHANGE_TOPIC_INFORM) Exchange exchange, @Qualifier(QUEUE_EMAIL) Queue queue) {
return BindingBuilder.bind(queue).to(exchange).with("info.#").noargs();
}
//如果后面还需要进行什么配置...在该配置类中继续扩展...
}
SendService.Java
import com.zb.config.MQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
@Service
//消息生产者,业务层...
public class SendService {
@Autowired
private RabbitTemplate rabbitTemplate; //amqp依赖中提供的 rabbit类!
//发送一条消息!
public void send() {
String msg = "一条boot的消息";
Map<String ,Object> param = new HashMap<>();
param.put("msg",msg);
/**
* 指定发送交换机!
* Routingkey
* 发送的数据..Object类型,发送时会进行序列化转换! 这边传什么类型——接收者 接收就以该类型接收即可!
*/
rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_TOPIC_INFORM, "info.email", param);
}
}
MQListener.Java
import com.zb.config.MQConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component //类上的注解,注册到Spring容器
//MQ监听类: 用来获取数据
public class MQListener {
/**
* @RabbitListener(queues = MQConfig.QUEUE_EMAIL) 注解: queues 要监听的队列...
*
* @param param 监听消息结果返回的类型!
* 监听操作是实时的,只要队列中有数据,这个方法就会执行!
*/
@RabbitListener(queues = MQConfig.QUEUE_EMAIL)
public void revice(Map<String, Object> param) {
System.out.println(param);
}
}
监听属性!
MyBootApp.Java
import com.zb.service.SendService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class MyBootApp {
public static void main(String[] args) {
ConfigurableApplicationContext run = SpringApplication.run(MyBootApp.class, args);
//主程序运行,并获取消息生产者... 执行生产数据!
SendService bean = run.getBean(SendService.class);
bean.send();
}
}