RabbitMQ 使用与详解
RabbitMQ参考中文文档
direct 把消息路由到那些binding key与routing key完全匹配的Queue中
topic
把消息路由到那些binding key与routing key模糊匹配的Queue中
匹配规则:
header
headers类型的Exchange不依赖于routingkey与binding key的匹配规则来路由消息,而是根据发送的消息内容中的 headers属性进行匹配。
使用docker运行,要使用管理页面用management的版本
docker run -d --name rabbitmq --publish 5671:5671 --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 rabbitmq:management
管理页面默认用户名和密码都是guest
点击Queues,Add a new queue
填入queue名称保存即可
点击Exchanges,Add a new exchange
输入Echange名称,选择type
保存即可
点击刚才创建的exchange,Bindings下面填入queue的名称和Routing Key即可
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mt.demo</groupId>
<artifactId>spring-boot-rabbitmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-rabbitmq-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
application.yml
spring:
application:
name: rabbitmq-demo
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirms: true #生产者可以判断消息是否发送到了broker
publisher-returns: true #生产者可以判断消息是否发送到了queue
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
server:
port: 10001
先在RabbitMQ管理页面上创建hello的队列,并且使用绑定到topic交换器上
创建一个消费者
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener {
@RabbitHandler
public void process(String hello) {
log.info("Receiver: {}", hello);
}
}
创建一个生产者
@GetMapping("/send")
public void send(@RequestParam String topic, @RequestParam String route, @RequestParam String msg) {
log.info("send topic[{}], msg: {}", topic, msg);
rabbitTemplate.convertAndSend(topic, route, msg);
}
如果再创建一个消费者绑定同样的队列,则可以看到两个消费者交替收到消息
@Slf4j
@Component
@RabbitListener(queues = "hello")
public class HelloListener2 {
@RabbitHandler
public void process(String hello) {
log.info("Receiver2: {}", hello);
}
}
如果再创建一个queue和前一个使用一样的bindingkey,则发送的消息会同是发送进两个queue
配置RabbitTemplate,加入消息确认机制回调
@Autowired
private ReturnCallBackListener returnCallBackListener;
@Autowired
private ConfirmCallbackListener confirmCallbackListener;
@Bean
public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(confirmCallbackListener);
/**
* 当mandatory标志位设置为true时
* 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息
* 那么broker会调用basic.return方法将消息返还给生产者
* 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
*/
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallBackListener);
return rabbitTemplate;
}
ConfirmCallback: ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调
ReturnCallback:ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调
完整代码参考GITHUB