这是一个基于JVM的消息代理的规范,但是他只是一个规范真正的实现则是 ActiveMQ和HornetMQ等等
高级消息队列,他也是消息队列的规范但是他是兼容JMS的,他的具体实现就是我们常常听到的RabbitMQ
spring-jms提供了对JMS的支持 spring-rabbit提供了对AMQP的支持 需要ConnectionFactory的实现来连接消息代理 提供JmsTemplate、Rabbit Template来发送消息 @JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发 布的消息 @EnableJms、@EnableRabbit开启支持
JmsAutoConfiguration RabbitAutoConfiguration
首先我们产生消息的我们叫做 生产者(Publisher)
然后我们生产者会把消息发送给 我们的消息服务器(Broker)
中的一个虚拟主机(Virtual Host)
,虚拟主机中有一个专门用来接受生产者消息的组件就是 交换机Exchange
,在接收到消息以后我们的路由器就和我们真正的某条 消息队列Queue
通过路由键连接起来,这里的消息队列有多个,然后消息队列与路由器的链接也是多对多的。最后就是我们的消费者去消息队列中消费消息,采用的就是建立TCP链接,但是我们为了节省资源开销,采用了一条TCP的多路复用,也就是在一个TCP中开辟了多个信道来传输数据。
12 | docker pull registry.docker-cn.com/library/rabbitmq:3.7-management #获取具有web管理面板的镜像+使用加速docker run -d -p 5672:5672 -p 8002:15672 --name rabbitmq_dev c51d1c73d028 |
---|
输入 ip:15672 输入 guest/guest 登录进入管理界面
1234 | <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency> |
---|
1234567891011121314151617181920212223242526272829303132333435363738 | @Configuration@ConditionalOnClass({ RabbitTemplate.class, Channel.class })@EnableConfigurationProperties(RabbitProperties.class)@Import(RabbitAnnotationDrivenConfiguration.class)public class RabbitAutoConfiguration { @Configuration @ConditionalOnMissingBean(ConnectionFactory.class) protected static class RabbitConnectionFactoryCreator { @Bean public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config){} } @Configuration @Import(RabbitConnectionFactoryCreator.class) protected static class RabbitTemplateConfiguration{ @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean(RabbitTemplate.class) public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {} @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix="spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean(AmqpAdmin.class) public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {} } @Configuration @ConditionalOnClass(RabbitMessagingTemplate.class) @ConditionalOnMissingBean(RabbitMessagingTemplate.class) @Import(RabbitTemplateConfiguration.class) protected static class MessagingTemplateConfiguration { @Bean @ConditionalOnSingleCandidate(RabbitTemplate.class) public RabbitMessagingTemplate rabbitMessagingTemplate{} }} |
---|
可以看到我们整个的自动配置类其实是我们的三个自动配置类的组合,在这个配置类里面我们配置了链接工厂,以及两个供我们便捷操作RabbitMQ的两个Template 这个和 redis 以及 jdbc 的 template 非常的类似。
这个bean就是配置了他的配置信息,包括地址端口什么的,这些可以看到是从我们的 RabbitProperties
中获取的,也就是 @EnableConfigurationProperties(RabbitProperties.class)
导入的 properties ,我们点进去就可以看到它对应的配置项以及关联的配置文件的前缀。其中前缀就是 “spring.rabbitmq” 下面就是具体的配置了。
1234567 | @ConfigurationProperties(prefix = "spring.rabbitmq")public class RabbitProperties { private String host = "localhost"; private int port = 5672; private String username; //....} |
---|
那么我们就在配置文件中填写配置项:
1234567 | spring: rabbitmq: host: 219.245.18.94 username: guest password: guest virtual-host: / port: 5672 |
---|
这个就是操作RabbitMQ的接口组件。
而这个是RabbitMQ的系统管理组件
消息相关的操作接口。
123456789101112131415161718192021222324 | @Testpublic void rabbitSend(){ //自定义消息头和消息体形式 rabbitTemplate.send("lwen.direct", "lwen.q1", new Message("hello".getBytes(),new MessageProperties())); //简单一点直接传输消息体,并且自动进行序列化 对象默认使用jdk序列化 HashMap<String, Object> msg = new HashMap<>(); msg.put("m1", "hello"); msg.put("m2", Arrays.asList("hello", "world")); rabbitTemplate.convertAndSend("lwen.direct", "lwen.q1", msg);}@Testpublic void receiveMsg(){ //获取消息进行反序列化 Object q1 = rabbitTemplate.receiveAndConvert("q1"); System.out.println(q1.getClass()); System.out.println(q1);} |
---|
上面就是简单的读写消息队列的操作。
以上我们看到的是单播的例子,其实广播以及模式匹配我们只是使用了不同的exchange就能达到目的。
可以看到上面的自动序列化出来的东西我们是没办法看的,我们更希望使用json等序列化工具。我们看一下他的序列化的配置。
在 public RabbitTemplate rabbitTemplate()
方法中 可以看到 MessageConverter messageConverter = this.messageConverter.getIfUnique();
就是获取消息转换的工具。
可以看到有这么多 MessageConverter 的具体实现,我们就是使用json的。
1234567 | @Configurationpublic class AMQPConfig { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); }} |
---|
之后我们可以在RabbitMQ的管理面板上看到这种格式的数据了,而不是二进制:
1 | {"m1":"hello","m2":["hello","world"]} |
---|
我们对一些应用解耦的话我们就需要使用消息队列,那么消息队列就需要有通知的功能,这里我们只需要两个注解就能搞定这个问题。
12345678 | @SpringBootApplication@EnableRabbitpublic class BootMqApplication { public static void main(String[] args) { SpringApplication.run(BootMqApplication.class, args); }} |
---|
1234567 | @Servicepublic class BookService { @RabbitListener(queues = "q1") public void bookReceive(Book book){ System.out.println(book); }} |
---|
前面我们看到在自动配置的时候他们帮我们创建了一个 amqpAdmin 这个组件,然后我们是可以拿这个组件进行创建路由、队列、以及绑定规则,也就是我们在管理面板上能完成的操作。
123456789101112131415 | @AutowiredAmqpAdmin amqpAdmin@Testpublic void rabbitAdmins(){ //创建路由 DirectExchange 是Exchange的实现类 类自己找 有很多 amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange")); //创建队列 amqpAdmin.declareQueue(new Queue("amqpAdmin.queue")); //绑定 amqpAdmin.declareBinding(new Binding("amqpAdmin.queue", Binding.DestinationType.QUEUE, "amqpAdmin.exchange", "amqp.key", null));} |
---|