备份交换器也叫备胎交换器,顾名思义就是,替代现任的,分手后可以及时上位,让你的爱情持续下去,原理就是我爱你换不来你爱我,那我就去找备胎了。回到消息里就很好理解了,生产者发送消息,由于路由错误不能到达指定队列,所以就路由到备胎队列消费,这样做可以保证未被路由的消息不会丢失,其实保证消息不会丢失还可以通过消息的回调方法,添加ReturnListener的编程逻辑,但是这样做生产者的代码会复杂写,所以我们使用备份交换器实现。
在实现之前我要介绍一个重要的参数
mandatory翻译成“强制的”,参数设置 true 时,交换器无法根据自身类型和路由键找到一个符合条件的队列,那么该消息会返回给生产者。当参数设置 false 时,出现上述情况,消息会被丢弃。
把参数设置 true,然后添加ReturnListener的编程逻辑就可以通过消息回调方式保证消息的可靠性。
下面介绍第二种方法,可以通过在声明交换器的时候设置 alternate-exchange 参数来实现,这样处理代码逻辑更简洁,本文的案例也是通过这种方式实现的。
开始之前让大家理解更直观,我画来一个图
备份交换器和普通交换器一样,建议设置为 fanout 类型,你也可以设置direct或者topic,但是重新发送给备份交换器时路由键也要匹配上,我们设置备份交换器就是说在路由键没有匹配上才有意义,所以没必要设置direct或者topic。
使用所有框架和中间件的版本
框架 | 版本 |
---|---|
Spring Boot | 2.1.5.RELEASE |
RabbitMq | 3.7.15 |
JDK | 1.8.0_144 |
Erlang | 22.0.2 |
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=yanlin
spring.rabbitmq.password=yanlin
@RestController
public class AEController {
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("/ae/{id}")
public String AETest(@PathVariable Integer id) {
Order order = new Order(id, "现任");
amqpTemplate.convertAndSend("TestAE", "abc", order);
order.setName("备胎");
amqpTemplate.convertAndSend("TestAE", "a", order);
System.out.println(order);
return "成功";
}
}
@Configuration
public class AEListener {
//一个正常对队列
private static final String queue = "good-queue";
//备份对队列
private static final String AEQueue = "AE-queue";
@Bean
public Queue goodQueue() {
return new Queue(queue);
}
@Bean
public Queue AEQueue() {
return new Queue(AEQueue);
}
@Bean
public DirectExchange goodExchange() {
Map<String, Object> arguments = new HashMap<>();
//如果设置此参数,TestAE交换器绑定的路由没有匹配到就会发送到exchange-unroute交换器所绑定的队列进行消费
arguments.put("alternate-exchange", "exchange-unroute");
return new DirectExchange("TestAE", true, false, arguments);
}
@Bean
public FanoutExchange unRouteExchange() {
// 此处的交换器的名字要和 exchange() 方法中 alternate-exchange 参数的值一致
return new FanoutExchange("exchange-unroute");
}
//被路由到的 交换器绑定队列
@Bean
Binding bindingExchangeGood(Queue goodQueue, DirectExchange goodExchange) {
return BindingBuilder.bind(goodQueue()).to(goodExchange()).with("abc");
}
//未被路由到的 交换器绑定队列
@Bean
Binding bindingExchangeAE(Queue AEQueue, FanoutExchange unRouteExchange) {
return BindingBuilder.bind(AEQueue).to(unRouteExchange);
}
}
@Service
public class AEConsumer {
@RabbitListener(queues = "good-queue")
@RabbitHandler
public void good(Order order) {
System.out.println("成功路由到,这是:" + order.getName());
}
@RabbitListener(queues = "AE-queue")
@RabbitHandler
public void AE(Order order) {
System.out.println("错误路由到,这是:" + order.getName());
}
}
启动生产者和消费者,调用接口
这样就实现了成功路由和备份交换器的路由
注意以下情况
如果设置的备份交换器不存在、备份交换器没有绑定任何队列、备份交换器绑定了队列但是没有匹配到,以上三种情况,客户端和服务端都不会出现异常,消息会丢失。
如果配置了 mandatory 参数和备份交换器一起用,那么参数无效。
github地址 https://github.com/362460453/rabbitMQ-demo