MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBM WEBSPHERE MQ等等。
队列就像存放了商品的仓库或者商店,是生产商品的工厂
和购买商品的用户
之间的中转站
在rabbitMQ中,信息流从你的应用程序出发,来到Rabbitmq的队列,所有信息可以只存储在一个队列中。队列可以存储很多信息,因为它基本上是一个无限制的缓冲区,前提是你的机器有足够的存储空间。
多个生产者可以将消息发送到同一个队列中,多个消息者也可以只从同一个队列接收数据。
spring.application.name=rabbitmq-direct-consumer
spring.rabbitmq.host=10.136.196.159
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
# 设置交换机 exchange 名称
mq.config.exchange=log.direct
# info 路由键
mq.config.queue.info.routing.key=log.info.routing.key
# info 队列名称
mq.config.queue.info=log.info
# error 路由键
mq.config.queue.error.routing.key=log.error.routing.key
# error 队列名称
mq.config.queue.error=log.error
spring.application.name=rabbitmq-direct-provider
spring.rabbitmq.host=10.136.196.159
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
# 设置交换机 exchange 名称
mq.config.exchange=log.direct
# info 路由键
mq.config.queue.info.routing.key=log.info.routing.key
# error 路由键
mq.config.queue.error.routing.key=log.error.routing.key
InfoReceiver
package com.pyy.mq.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* 消息接收者
* @RabbitListener bindings:绑定队列
* @QueueBinding value:绑定队列的名称
* exchange:配置交换器
*
* @Queue value:配置队列名称
* autoDelete:是否是一个可删除的临时队列
*
* @Exchange value:为交换器起个名称
* type:指定具体的交换器类型
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
key = "${mq.config.queue.info.routing.key}"
)
)
public class InfoReceiver {
/**
* 接收消息方法,采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("Info receiver:" + msg);
}
}
ErrorReceiver
package com.pyy.mq.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* 消息接受者
* @author wolf
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
key = "${mq.config.queue.error.routing.key}"
)
)
public class ErrorReceiver {
/**
* 接收消息方法,采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("Error receiver:" + msg);
}
}
package com.pyy.mq.service;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 消息发送者
* @author wolf
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
// 交换机名称
@Value("${mq.config.exchange}")
private String exchange;
// 路由键
@Value("${mq.config.queue.info.routing.key}")
private String routingkey;
/**
* 发送消息方法
* @param msg
*/
public void send(String msg) {
// 向指定交换机 exchange 中通过执行的路由键 routingkey 中发送消息
//参数一:交换器名称。
//参数二:路由键
//参数三:消息
this.rabbitAmqpTemplate.convertAndSend(exchange, routingkey, msg);
}
}
customer 配置文件
spring.application.name=rabbitmq-topic-consumer
spring.rabbitmq.host=10.136.196.159
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
# 设置交换机 exchange 名称
mq.config.exchange=log.topic
# info 队列名称
mq.config.queue.info=log.info
# error 队列名称
mq.config.queue.error=log.error
# logs 队列名称
mq.config.queue.logs=log.all
provider 配置文件
spring.application.name=rabbitmq-topic-provider
spring.rabbitmq.host=10.136.196.159
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
# 设置交换机 exchange 名称
mq.config.exchange=log.topic
package com.pyy.mq.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* 消息接受者
* @author wolf
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "*.log.info"
)
)
public class InfoReceiver {
/**
* 接收消息方法,采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("Info receiver:" + msg);
}
}
ErrorReceiver
package com.pyy.mq.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* 消息接受者
* @author wolf
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "*.error.info"
)
)
public class ErrorReceiver {
/**
* 接收消息方法,采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("Error receiver:" + msg);
}
}
LogsReceiver
package com.pyy.mq.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
/**
* 消息接受者
* @author wolf
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
key = "*.log.*"
)
)
public class LogsReceiver {
/**
* 接收消息方法,采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("All receiver:" + msg);
}
}
package com.pyy.mq.service;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 消息发送者
* @author wolf
*/
@Component
public class UserSender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
// 交换机名称
@Value("${mq.config.exchange}")
private String exchange;
/**
* 发送消息方法
* @param msg
*/
public void send(String msg) {
// 向指定交换机 exchange 中通过执行的路由键 routingkey 中发送消息
this.rabbitAmqpTemplate.convertAndSend(exchange, "user.log.info", "user.og.info-->" + msg);
this.rabbitAmqpTemplate.convertAndSend(exchange, "user.log.error", "user.log.error-->" + msg);
this.rabbitAmqpTemplate.convertAndSend(exchange, "user.log.debug", "user.log.debug-->" + msg);
this.rabbitAmqpTemplate.convertAndSend(exchange, "user.log.warn", "user.log.warn-->" + msg);
}
}
ProductProvider
package com.pyy.mq.service;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 消息发送者
* @author wolf
*/
@Component
public class ProductSender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
// 交换机名称
@Value("${mq.config.exchange}")
private String exchange;
/**
* 发送消息方法
* @param msg
*/
public void send(String msg) {
// 向指定交换机 exchange 中通过执行的路由键 routingkey 中发送消息
this.rabbitAmqpTemplate.convertAndSend(exchange, "product.log.info", "product.log.info-->" + msg);
this.rabbitAmqpTemplate.convertAndSend(exchange, "product.log.error", "product.product.log.error-->" + msg);
this.rabbitAmqpTemplate.convertAndSend(exchange, "product.log.debug", "product.log.debug-->" + msg);
this.rabbitAmqpTemplate.convertAndSend(exchange, "product.log.warn", "product.log.warn-->" + msg);
}
}
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=oldlu
spring.rabbitmq.password=123456
#设置交换器的名称
mq.config.exchange=order.fanout
#短信服务队列名称
mq.config.queue.sms=order.sms
#push服务队列名称
mq.config.queue.push=order.push
Provider
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=oldlu
spring.rabbitmq.password=123456
#设置交换器的名称
mq.config.exchange=order.fanout
SmsReceiver
/**
* 消息接收者
* @author Administrator
* @RabbitListener bindings:绑定队列
* @QueueBinding value:绑定队列的名称
* exchange:配置交换器
* key:路由键
*
* @Queue value:配置队列名称
* autoDelete:是否是一个可删除的临时队列
*
* @Exchange value:为交换器起个名称
* type:指定具体的交换器类型
*/
@Component
@RabbitListener(
bindings=@QueueBinding(
value=@Queue(
value="${mq.config.queue.sms}",autoDelete="true"),
exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.FANOUT)
)
)
public class SmsReceiver {
/**
* 接收消息的方法。采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("Sms........receiver: "+msg);
}
}
PushReceiver
/**
* 消息接收者
* @author Administrator
* @RabbitListener bindings:绑定队列
* @QueueBinding value:绑定队列的名称
* exchange:配置交换器
*
* @Queue value:配置队列名称
* autoDelete:是否是一个可删除的临时队列
*
* @Exchange value:为交换器起个名称
* type:指定具体的交换器类型
*/
@Component
@RabbitListener(
bindings=@QueueBinding(value=@Queue(
value="${mq.config.queue.push}",autoDelete="true"),exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.FANOUT)
)
)
public class PushReceiver {
/**
* 接收消息的方法。采用消息队列监听机制
* @param msg
*/
@RabbitHandler
public void process(String msg){
System.out.println("Push..........receiver: "+msg);
}
}
/**
* 消息发送者
* @author Administrator
*
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
//exchange 交换器名称
@Value("${mq.config.exchange}")
private String exchange;
/*
* 发送消息的方法
*/
public void send(String msg){
//向消息队列发送消息
//参数一:交换器名称。
//参数二:路由键
//参数三:消息
this.rabbitAmqpTemplate.convertAndSend(this.exchange,"", msg);
}
}