文章目录
docker pull rabbitmq:3.7-management
docker run --name rabbitmq -p 15672:15672 -p 5672:5672 -d df80af9ca0c9
http://[ip]:15672
即可登录routing_key
方案非常简单,如果我们希望一条消息发送给多个队列,那么这个交换机需要绑定上非常多的routing_key
,假设每个交换机上都绑定一堆的routing_key
连接到各个队列上。那么消息的管理就会异常地困难。
RabbitMQ
提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key
,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。
主题交换机的routing_key
需要有一定的规则,交换机和队列的binding_key
需要采用*.#.*.....
的格式,每个部分用.
分开,其中:
*
表示一个单词rabbit.*
能够匹配到rabbit.new
rabbit.*
不能够匹配到rabbit.new.old
#
表示任意数量(零个或多个)单词。rabbit.#
能够匹配到rabbit.new
rabbit.#
能够匹配到rabbit.new.old
routing_key
为fast.rabbit.white
,那么带有这样binding_key
的几个队列都会接收这条消息Name
:交换机名称Durability
:是否持久化。如果持久性,则RabbitMQ重启后,交换机还存在Auto-delete
:当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它name
:名称durable
:是否持久化,如果不持久化,那么重启后将会不存在exclusive
:独享(只被一个连接(connection)使用,而且当连接关闭后队列即被删除) autoDelete
:自动删除,当最后一个消费者退订后即被删除 arguments
:其他<!-- rabbitmq启动器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.rabbitmq.host=192.168.0.86 ## 主机地址
spring.rabbitmq.port=5672 ## 端口
spring.rabbitmq.username=admin ## 用户名
spring.rabbitmq.password=123456 ## 密码
spring.rabbitmq.virtual-host=/ ## 虚拟主机,这里的用户名和密码一定要对这个虚拟主机有权限
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Topic交换机的配置类
* 1、配置完成之后,当使用到的时候会自动创建,不需要手动的创建,当然使用rabbitAdmin也是可以手动创建的
*/
@Configuration //指定这是一个配置类
public class TopicConfig {
/**
* 创建队列 queue1
* @return
*/
@Bean
public Queue queue1(){
return new Queue("queue_1",true);
}
/**
* 创建队列 queue2
* @return
*/
@Bean
public Queue queue2(){
//指定名称和持久化
return new Queue("queue_2",true);
}
/**
* 创建topic交换机
*/
@Bean
public TopicExchange topic1(){
return new TopicExchange("topic_1");
}
/**
* 将交换机topic1和队列queue1通过路邮键message_1绑定在一起
* @param topic1 交换机1 ,这里通过名称匹配,因为是通过@Bean自动注入的
* @param queue1 队列1 这里通过名称匹配,因为是通过@Bean自动注入的
* @return
*/
@Bean
public Binding bindTopic1AndQueu1(TopicExchange topic1,Queue queue1 ){
return BindingBuilder.bind(queue1).to(topic1).with("message_1");
}
/**
* 将交换机topic1和队列queue2通过路邮键message_2绑定在一起
* @param topic1
* @param queue1
* @return
*/
@Bean
public Binding bindTopic1AndQueu2(TopicExchange topic1,Queue queue2 ){
return BindingBuilder.bind(queue2).to(topic1).with("message_2");
}
}
@EnableRabbit
@EnableRabbit //开启rabbitmq
@SpringBootApplication
public class DemoApplication extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
//继承SpringBootServletInitializer实现war包的发布
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(DemoApplication.class);
}
}
@Service
public class RabbitServiceImpl implements RabbitServie {
@Resource
private RabbitTemplate rabbitTemplate发送消息;
//使用rabbitTemplate发送消息
@Override
public void send() {
Map<String, Object> map=new HashedMap();
map.put("name", "陈加兵");
rabbitTemplate.convertAndSend("topic_1", "message_1", map);
}
//使用rabbitTemplate接收消息
@Override
public void get() {
Map<String, Object> map=(Map<String, Object>) rabbitTemplate.receiveAndConvert("queue_1");
System.out.println(map);
}
}
void convertAndSend(String exchange, String routingKey, final Object object)
:发送消息exchange
:交换机routingKey
:路由键object
:需要发送的对象Object receiveAndConvert(String queueName)
:接收指定队列的消息queueName
:消息队列的名字 @Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}
@RunWith(SpringRunner.class)
@SpringBootTest // springBoot测试类,可以自定义测试类,不过需要引用这两个注解
public class RabbitMqTest {
@Resource
private AmqpAdmin amqpAdmin; //自动注入即可
@Test
public void test4() {
DirectExchange directExchange = new DirectExchange("test_direct");
Queue queue = new Queue("direct_1", true);
// 创建一个直连的交换机
amqpAdmin.declareExchange(directExchange);
// 创建一个队列
amqpAdmin.declareQueue(queue);
//创建绑定关系
amqpAdmin.declareBinding(BindingBuilder.bind(queue)
.to(directExchange).with("direct_message"));
}
}
@RabbitListener
,可以监听指定的队列,一旦这个队列中有消息了,那么就会执行@EnableRabbit
开启基于注解的rabbit的消息监听/**
* rabbitmq的消息处理类
* @author 陈加兵
*/
@Component //注入
public class MessageHandler {
/**
* 使用@RabbitListener这个注解监听指定的队列,一旦这个队列有了消息,那么将会执行
* @param log 消息的内容,如果接收的消息内容是log对象,那么将会被反序列化,存入这个log中
* 消息一旦被监听到了并且被执行了,那么这条队列的消息将会被删除了
*/
@RabbitListener(queues={"direct_1"})
public void received(Log log){
System.out.println("------接收到消息----");
System.out.println("消息内容为:"+log);
}
/**
* 使用org.springframework.amqp.core.Message对象来接收消息,可以显示消息头一些信息
* @param message
*/
@RabbitListener(queues={"direct_1"})
public void received1(Message message){
System.out.println("------接收到消息1----");
byte[] body=message.getBody();
System.out.println(message.getMessageProperties());
}
}
@RabbitListener
可以标注在类上面,需配合 @RabbitHandler 注解一起使用@RabbitListener
标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型/**
* 处理消息的类,使用@RabbitListener监听队列,结合@RabbitHandler处理不同内容类型的消息
* @author Administrator
*
*/
@RabbitListener(queues={"direct_1"}) //监听direct_1这个队列的消息
@Component //注入
public class RabbitMessage {
/**
* 定义处理的方法是接收内容的类型为Log类型
* @param log
*/
@RabbitHandler
public void receivedLog(Log log){
System.out.println("接收了log对象");
System.out.println(log);
}
/**
* 定义接收内容为User类型的消息
* @param user
*/
@RabbitHandler
public void receivedMap(User user){
System.out.println("接收了user对象");
System.out.println(user);
}
}
# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 开启ACK,开启之后只有手动提交才会消费消息
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
ConfirmCallback
: 这个是rabbitmq的确认的回调接口,当消息发送之后,会异步调用实现这个接口的方法ReturnCallback
:这个是rabbitmq的失败回调接口,当消息发送失败之后,会异步调用实现这个接口的方法/**
* 消息发送的业务层
* SendMessageService : 发送消息的接口
* ConfirmCallback : 消息发送成功的回调接口
* ReturnCallback : 消息发送失败的回调接口(找不到对应的路由或者因为各种原因消息没有成功投递到rabbitmq中都会出发回调)
* @author 陈加兵
* @since 2018年11月15日 下午4:45:37
*/
@Service
public class SendMessageServiceImpl implements SendMessageService,ConfirmCallback,ReturnCallback {
@Resource
private RabbitTemplate rabbitTemplate; //注入rabbitMq的template,用于发送和消费消息
private Logger logger=LoggerFactory.getLogger(SendMessageServiceImpl.class); //日志
/**
* 消息发送失败的回调方法,实现ReturnCallback接口的方法
* 1、消息没有投递成功,包括没有找到对应的队列或者路由键
*/
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
logger.info("返回的失败代码="+replyCode+" 返回的失败信息="+replyText);
logger.info("交换机="+exchange+" 绑定的路由键="+routingKey);
}
/**
* 消息发送确认的回调方法
* 如果消息没有到exchange,则confirm回调,ack=false
* 如果消息到达exchange,则confirm回调,ack=true
* 判断消息有没有成功发送,只需要判断ack的值,correlationData是发送消息的时候传递过来的值(String)
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
//如果ack==true,表示消息发送成功
if (ack) {
logger.info("消息发送成功,下面开始处理业务。。。。。。。。。。。。。。。");
logger.info("correlationData="+correlationData);
}else {
logger.info("消息发送失败。。。。。。。。。。。。。。。。");
logger.info("cause="+cause);
}
}
/**
* 发送消息的方法
*/
@Override
public void sendMessage(Log log) throws Exception {
rabbitTemplate.setConfirmCallback(this); //设置
rabbitTemplate.setReturnCallback(this);
CorrelationData data=new CorrelationData();
data.setId("success"); //定义内容,在消息发送成功的回调方法中可以获取这个值
rabbitTemplate.convertAndSend("amq.direct", "message", log,data); //发送消息
}
}
com.rabbitmq.client.Channel
channel.basicAck()
:手动ackdeliveryTag
:该消息的indexmultiple
:是否批量,如果为true
将一次性ack所有小于deliveryTag的消息,如果为false
,那么将ack当前的消息channel.basicNack(deliveryTag, multiple, requeue)
deliveryTag
:该消息的indexmultiple
:是否批量,如果为true
将一次性ack所有小于deliveryTag的消息,如果为false
,那么将ack当前的消息requeue
:被丢弃消息是否重新进入队列,如果是true将会重新进入队列import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import com.techwells.demo.domain.Log;
/**
* 监听队列direct_1
* @author 陈加兵
* @since 2018年11月15日 下午6:55:56
*/
@Component
@RabbitListener(queues={"direct_1"})
public class ReceivedMessageHandler {
private Logger logger=LoggerFactory.getLogger(ReceivedMessageHandler.class);
/**
* 接收消息类型为Log的消息
* 如果不手动提交的话,默认是不会被自动确认消费的,只有手动提交了,才会被真正的消费
* @param log 消息的实体类
* @param channel
* @param message rabbitmq的Message类
* @throws IOException
*/
@RabbitHandler //处理消息
public void handleMessage(Log log,Channel channel,Message message) throws IOException{
logger.info("成功接收到消息........"+log);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //手动提交ack,消费消息
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
logger.info("成功被消费。。。。。。。。。。");
}
/**
* 处理消息类型为String类型的消息
* @param str
* @param channel
* @param message
* @throws IOException
*/
@RabbitHandler //处理消息
public void handleStringMessage(String str,Channel channel,Message message) throws IOException{
logger.info("成功接收到消息........"+str);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //nack,不消费这条消息,一般是业务失败才会不消费
}
}
1、https://www.kancloud.cn/yunxifd/rabbitmq/96997
2、中文文档
3、https://www.cnblogs.com/ityouknow/p/6120544.html
4、事务
5、ACK