本文介绍RabbitMq各个消息类型,以及用使用Fanout 类型进行消息的发送和消费,让大家对RabbitMq有一个简单的认识。
使用所有框架和中间件的版本
框架 | 版本 |
---|---|
Spring Boot | 2.1.5.RELEASE |
RabbitMq | 3.7.15 |
JDK | 1.8.0_144 |
Erlang | 22.0.2 |
RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
如图A、B、C都绑定在同一个交换器上
此类型exchange和direct类型差不多,但direct类型要求routingkey完全相等,有局限性,但是在Topic这里的routingkey可以有通配符:'*','#'. ,相比Direct类型来说就放宽了命中的队列,对Direct 类型进行了扩展。 其中'*'表示仅匹配一个单词, '#'则表示匹配零个或一个和或者多个单词。例如 “abc.#” 能匹配“abc.def.ghi” 和 “abc.def” ,但是 “abc.*” 只能匹配到 “abc.def”
如图 A、B、C都绑定在同一个交换器上
广播类型没有路由key的概念,因为它的原理是把生产者要发送到队列里的数据给存在当前信道里的每一个队列都发一份,一模一样的复制到每个队列,也就是说,只要是当前交换器绑定的队列就都可以收到消息。
如图A、B、C都绑定在同一个交换器上
Headers方式是在绑定对列的时候将匹配的条件以字典型数据当参数传入,然后在生产的时候再将要匹配的条件以字典型数据当参数传入来进行匹配。此类型基本不会有用到,不做讲解。
以上就是exchange 类型的总结,一般来说direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。
读前须知: 一般的,我们使用分布式或微服务的形式开发,所有的服务都会独立出来,为了让各个服务降低耦合性,我们一般会把Model单独拉出来,让各个服务依赖他,如果加了中间件服务的话,也会让我们中间件服务依赖 model,达到与业务服务解耦。
所以本文案例创建了三个工程,分别是一个商品服务、消费者、模型。
既然Spring已经为我们封装好了API 那我们就用他们封装的,省的自己还要创建连接和信道,代码好多,好烦,如果有个性化设置需要自己设置参数的,以后的章节我们会介绍。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=yanlin
spring.rabbitmq.password=yanlin
@RestController
public class GoodsController {
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("/test/{id}")
public String goodsTest(@PathVariable Integer id) {
Order order = new Order(id, "xx");
amqpTemplate.convertAndSend("GOODS", "", order);
System.out.println(order);
return "成功";
}
}
@Configuration
public class GoodsChangeListener {
private static final String queue = "goods-change";
@Bean
public Queue goodsChangeQueue() {
return new Queue(queue);
}
@Bean
FanoutExchange goodsChangeExchange() {
return new FanoutExchange("GOODS");// 配置广播路由器
}
//交换器绑定队列
@Bean
Binding bindingExchangeGoodsChange(Queue goodsChangeQueue, FanoutExchange goodsChangeExchange) {
return BindingBuilder.bind(goodsChangeQueue).to(goodsChangeExchange);
}
}
@Service
public class GoodsConsumer {
@RabbitListener(queues = "goods-change")
@RabbitHandler
public void goodsChange( Order order) {
System.out.println("消费消息,商品名:" + order.getName());
}
}
需要注意的是,大家在发送消息体的时候如果传递的是对象,那么需要注意你必须要实现序列化接口生成序列化ID,而且对象需要时同一个,类名一样不叫同一个对象,因为内存地址是不同的,所以前面说到要把model单独拉出来。否则会报错,笔者亲身测试。
正确输出内容:
这样,一个简单的生产和消费就完成了。
项目如图
github地址 https://github.com/362460453/rabbitMQ-demo