异步处理:
应用解耦:
流量削峰:
核心概念 | |
---|---|
Message | 消息 |
Publisher | 消息的生产者,也是一个向交换器发布消息的客户端应用程序。 |
Exchange | 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 |
Queue | 消息队列,用来保存消息直到发送给消费者。 |
Binding | 绑定,用于消息队列和交换器之间的关联。 |
Connection | 网络连接,比如一个 TCP 连接。 |
Channel | 信道,多路复用连接中的一条独立的双向数据流通道。 |
Consumer | 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 |
Virtual Host | 虚拟主机,表示一批交换器、消息队列和相关对象。 |
Broker | 表示消息队列服务器实体 |
图示:
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别, AMQP 中增加了Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
Exchange 分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、 fanout、 topic、 headers 。 headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。 fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。 fanout 类型转发消息是最快的 。
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*” 。 #匹配 0 个或多个单词, *匹配一个单词。
// 选择3-mansgement 带web管理界面
docker pull rabbitmq:3-management
docker run -d -p 5672:5672 -p 15672:15672 --name mybebbitmq rabbitmq:3-management
idea 创建工程
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.rabbitmq.host=192.168.64.129
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
@Autowired
private AmqpAdmin amqpAdmin;
@Test
void createExchange() {
// 创建exchange
amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
System.out.println("创建完成");
// 创建Queue
amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));
// 创建绑定规则
amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE, "amqpadmin.exchange", "amqpadmin.queue", null));
}
发送:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test1(){
// Message 需要自己构建一个;定义消息体内容和消息头
// rabbitTemplate.send(exchange,routeKey, message);
// object默认当成消息体,只需要传入要发送的消息,自动序列化发送给rabbitmq;
// rabbitTemplate.convertAndSend(exchange, object);
rabbitTemplate.convertAndSend("amqpadmin.exchange","amqpadmin.queue","测试:test.msg");
}
接收:
// 接收数据
@Test
public void receive(){
Object o = rabbitTemplate.receiveAndConvert("amqpadmin.queue");
System.out.println(o.getClass());
System.out.println(o);
}
第二种发送,以 Json 方式
// 第二种发送,以Json方式
@Test
public void test1(){
// 以Json 数据格式发送
HashMap<String, Object> map = new HashMap<>();
map.put("msg","这是第一条消息");
map.put("data",Arrays.asList("helloworld",123,true));
rabbitTemplate.convertAndSend("amqpadmin.exchange","amqpadmin.queue", map);
// rabbitTemplate.convertAndSend("amqpadmin.exchange","amqpadmin.queue",new Book("西游记","吴承恩"));
}
发送的 Json 数据被序列化,没有显示正常的 json 数据格式,解决方式:自定义序列方式采用 JSON
@Configuration
public class MyAMQConfig {
// 自定义序列的为Json 格式
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
// 先创建Exchange
amqpAdmin.declareExchange(new FanoutExchange("amqpadmin.fanout"));
// 绑定
Binding.DestinationType.QUEUE,"amqpadmin.fanout","amqpadmin.queue",null));
@Test
public void sendMsg(){
rabbitTemplate.convertAndSend("amqpadmin.fanout","",new Book("红楼梦","曹雪芹"));
}
@Service
public class BookService {
@RabbitListener(queues = "amqpadmin.queue")
public void receive(Book book){
System.out.println("收到消息:" + book);
}
@RabbitListener(queues = "amqpadmin.queue")
public void receive2(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
}