前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ的交换器Exchange之direct(发布与订阅 完全匹配)

RabbitMQ的交换器Exchange之direct(发布与订阅 完全匹配)

作者头像
别先生
发布2019-11-04 23:29:52
6400
发布2019-11-04 23:29:52
举报
文章被收录于专栏:别先生别先生

1、交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。三种常用的交换器类型,a、direct(发布与订阅 完全匹配)。b、fanout(广播)。c、topic(主题,规则匹配)。

2、direct(发布与订阅 完全匹配)的使用。

由于使用的是SpringBoot项目结合Maven项目构建的。项目工程如下所示:

3、生产者模块和消费者模块分开的,但是pom.xml是一致的,如下所示:

代码语言:javascript
复制
 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 5     https://maven.apache.org/xsd/maven-4.0.0.xsd">
 6     <modelVersion>4.0.0</modelVersion>
 7     <parent>
 8         <groupId>org.springframework.boot</groupId>
 9         <artifactId>spring-boot-starter-parent</artifactId>
10         <version>2.1.1.RELEASE</version>
11         <relativePath /> <!-- lookup parent from repository -->
12     </parent>
13     <groupId>com.bie</groupId>
14     <artifactId>rabbitmq-direct-provider</artifactId>
15     <version>0.0.1-SNAPSHOT</version>
16     <name>rabbitmq-direct-provider</name>
17     <description>Demo project for Spring Boot</description>
18 
19     <properties>
20         <java.version>1.8</java.version>
21     </properties>
22 
23     <dependencies>
24         <dependency>
25             <groupId>org.springframework.boot</groupId>
26             <artifactId>spring-boot-starter</artifactId>
27         </dependency>
28         <dependency>
29             <groupId>org.springframework.boot</groupId>
30             <artifactId>spring-boot-starter-web</artifactId>
31         </dependency>
32         <dependency>
33             <groupId>org.springframework.boot</groupId>
34             <artifactId>spring-boot-starter-test</artifactId>
35             <scope>test</scope>
36         </dependency>
37         <dependency>
38             <groupId>org.springframework.boot</groupId>
39             <artifactId>spring-boot-starter-amqp</artifactId>
40         </dependency>
41     </dependencies>
42 
43     <build>
44         <plugins>
45             <plugin>
46                 <groupId>org.springframework.boot</groupId>
47                 <artifactId>spring-boot-maven-plugin</artifactId>
48             </plugin>
49         </plugins>
50     </build>
51 
52 </project>

配置生产者的配置文件application.properties。配置如下所示:

代码语言:javascript
复制
 1 # 给当前项目起名称.
 2 spring.application.name=rabbitmq-direct-provider
 3 
 4 # 配置端口号
 5 server.port=8081
 6 
 7 
 8 # 配置rabbitmq的参数.
 9 # rabbitmq服务器的ip地址.
10 spring.rabbitmq.host=192.168.110.133
11 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.
12 spring.rabbitmq.port=5672
13 # rabbitmq的账号.
14 spring.rabbitmq.username=guest
15 # rabbitmq的密码.
16 spring.rabbitmq.password=guest
17 
18 # 设置交换器的名称,方便修改.
19 # 生产者和消费者的交换器的名称是一致的,这样生产者生产的消息发送到交换器,消费者可以从这个交换器中消费.
20 rabbitmq.config.exchange=log.exchange.direct
21 
22 # 生产者生产消息的时候也要带上路由键,队列通过路由键绑定到交换器,交换器根据路由键将绑定到队列上.
23 # 交换器根据不同的路由键将消息发送到不同队列上.
24 # info的路由键.
25 rabbitmq.config.queue.info.routing.key=log.info.routing.key
26 
27 # error的路由键.
28 rabbitmq.config.queue.error.routing.key=log.error.routing.key

配置完毕,配置文件开始写生产者生产消息代码。

本模块练习的是发布订阅模式即Direct,分为两个生产者LogInfo、LogError,生产者生产消息的时候也要带上路由键,队列通过路由键绑定到交换器(即交换器根据路由键将绑定到队列上),交换器根据不同的路由键将消息发送到不同队列上。

本项目指定了info的路由键、error的路由键,然后生产者生产的消息发送到指定的交换器。交换器通过路由到绑定的队列中去,最后消费者进行监听队列发生变化,触发指定的方法进行消息的消费。

代码语言:javascript
复制
 1 package com.example.bie.provider;
 2 
 3 import org.springframework.amqp.core.AmqpTemplate;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.beans.factory.annotation.Value;
 6 import org.springframework.stereotype.Component;
 7 
 8 /**
 9  * 
10  * @author biehl
11  * 
12  *         生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.
13  * 
14  *         这里使用的交换器类型使用的是direct发布订阅模式,
15  *         根据配置的路由routing-key来决定,将不同的消息路由到不同的队列queue中。
16  *         不同的消息具有相同的路由键,就会进入相同的队列当中去。
17  *
18  * 
19  */
20 @Component
21 public class RabbitMqLogInfoProduce {
22 
23     @Autowired
24     private AmqpTemplate rabbitmqAmqpTemplate;
25 
26     // 交换器的名称Exchange
27     @Value(value = "${rabbitmq.config.exchange}")
28     private String exchange;
29 
30     // 路由键routingkey
31     @Value(value = "${rabbitmq.config.queue.info.routing.key}")
32     private String routingKey;
33 
34     /**
35      * 发送消息的方法
36      * 
37      * @param msg
38      */
39     public void producer(String msg) {
40         // 向消息队列发送消息
41         // 参数1,交换器的名称
42         // 参数2,路由键
43         // 参数3,消息
44         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKey, msg);
45     }
46 
47 }
代码语言:javascript
复制
 1 package com.example.bie.provider;
 2 
 3 import org.springframework.amqp.core.AmqpTemplate;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.beans.factory.annotation.Value;
 6 import org.springframework.stereotype.Component;
 7 
 8 /**
 9  * 
10  * @author biehl
11  * 
12  *         生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.
13  * 
14  *         这里使用的交换器类型使用的是direct发布订阅模式,
15  *         根据配置的路由routing-key来决定,将不同的消息路由到不同的队列queue中。
16  *         不同的消息具有相同的路由键,就会进入相同的队列当中去。
17  *
18  * 
19  */
20 @Component
21 public class RabbitMqLogErrorProduce {
22 
23     @Autowired
24     private AmqpTemplate rabbitmqAmqpTemplate;
25 
26     // 交换器的名称Exchange
27     @Value(value = "${rabbitmq.config.exchange}")
28     private String exchange;
29 
30     // 路由键routingkey
31     @Value(value = "${rabbitmq.config.queue.error.routing.key}")
32     private String routingKey;
33 
34     /**
35      * 发送消息的方法
36      * 
37      * @param msg
38      */
39     public void producer(String msg) {
40         // 向消息队列发送消息
41         // 参数1,交换器的名称
42         // 参数2,路由键
43         // 参数3,消息
44         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKey, msg);
45     }
46 }

这里使用web工程,浏览器访问调用,方便测试。

代码语言:javascript
复制
 1 package com.example.bie.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Controller;
 5 import org.springframework.web.bind.annotation.RequestMapping;
 6 import org.springframework.web.bind.annotation.ResponseBody;
 7 
 8 import com.example.bie.provider.RabbitMqLogErrorProduce;
 9 import com.example.bie.provider.RabbitMqLogInfoProduce;
10 
11 /**
12  * 
13  * @author biehl
14  *
15  */
16 @Controller
17 public class RabbitmqController {
18 
19     @Autowired
20     private RabbitMqLogInfoProduce rabbitMqLogInfoProduce;
21 
22     @Autowired
23     private RabbitMqLogErrorProduce rabbitMqLogErrorProduce;
24 
25     @RequestMapping(value = "/logInfo")
26     @ResponseBody
27     public String rabbitmqSendLogInfoMessage() {
28         String msg = "生产者===>生者的LogInfo消息message: ";
29         for (int i = 0; i < 100000; i++) {
30             rabbitMqLogInfoProduce.producer(msg + i);
31         }
32         return "生产===>  LogInfo消息message  ===> success!!!";
33     }
34 
35     @RequestMapping(value = "/logError")
36     @ResponseBody
37     public String rabbitmqSendLogErrorMessage() {
38         String msg = "生产者===>生者的LogError消息message: ";
39         for (int i = 0; i < 100000; i++) {
40             rabbitMqLogErrorProduce.producer(msg + i);
41         }
42         return "生产===>  LogError消息message  ===> success!!!";
43     }
44 
45 }

生产者的启动类,如下所示:

代码语言:javascript
复制
 1 package com.example;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 @SpringBootApplication
 7 public class RabbitmqProducerApplication {
 8 
 9     public static void main(String[] args) {
10         SpringApplication.run(RabbitmqProducerApplication.class, args);
11     }
12 
13 }

4、生产者搞完以后,开始搞消费者。由于pom.xml配置文件一致,这里省略消费者的pom.xml配置文件。

代码语言:javascript
复制
 1 # 给当前项目起名称.
 2 spring.application.name=rabbitmq-direct-consumer
 3 
 4 # 配置端口号
 5 server.port=8080
 6 
 7 # 配置rabbitmq的参数.
 8 # rabbitmq服务器的ip地址.
 9 spring.rabbitmq.host=192.168.110.133
10 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.
11 spring.rabbitmq.port=5672
12 # rabbitmq的账号.
13 spring.rabbitmq.username=guest
14 # rabbitmq的密码.
15 spring.rabbitmq.password=guest
16 
17 # 设置交换器的名称,方便修改.
18 # 路由键是将交换器和队列进行绑定的,队列通过路由键绑定到交换器.
19 rabbitmq.config.exchange=log.exchange.direct
20 
21 # info级别的队列名称.
22 rabbitmq.config.queue.info=log.info.queue
23 # info的路由键.
24 rabbitmq.config.queue.info.routing.key=log.info.routing.key
25 
26 # error级别的队列名称.
27 rabbitmq.config.queue.error=log.error.queue
28 # error的路由键.
29 rabbitmq.config.queue.error.routing.key=log.error.routing.key

消费者消费消息的编写,如下所示:

代码语言:javascript
复制
 1 package com.example.bie.consumer;
 2 
 3 import org.springframework.amqp.core.ExchangeTypes;
 4 import org.springframework.amqp.rabbit.annotation.Exchange;
 5 import org.springframework.amqp.rabbit.annotation.Queue;
 6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9 import org.springframework.stereotype.Component;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         消息接收者
16  * 
17  *         1、@RabbitListener bindings:绑定队列
18  * 
19  *         2、@QueueBinding
20  *         value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21  * 
22  *         3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23  * 
24  *         4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25  * 
26  * 
27  */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30 
31         value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),
32 
33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),
34 
35         key = "${rabbitmq.config.queue.error.routing.key}"))
36 public class LogErrorConsumer {
37 
38     /**
39      * 接收消息的方法,采用消息队列监听机制.
40      * 
41      * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42      * 
43      * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44      * 
45      * @param msg
46      */
47     @RabbitHandler
48     public void consumer(String msg) {
49         // 打印消息
50         System.out.println("ERROR消费者===>消费<===消息message: " + msg);
51     }
52 
53 }
代码语言:javascript
复制
 1 package com.example.bie.consumer;
 2 
 3 import org.springframework.amqp.core.ExchangeTypes;
 4 import org.springframework.amqp.rabbit.annotation.Exchange;
 5 import org.springframework.amqp.rabbit.annotation.Queue;
 6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9 import org.springframework.stereotype.Component;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         消息接收者
16  * 
17  *         1、@RabbitListener bindings:绑定队列
18  * 
19  *         2、@QueueBinding
20  *         value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21  * 
22  *         3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23  * 
24  *         4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25  * 
26  * 
27  */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30 
31         value = @Queue(value = "${rabbitmq.config.queue.info}", autoDelete = "true"),
32 
33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),
34 
35         key = "${rabbitmq.config.queue.info.routing.key}"))
36 public class LogInfoConsumer {
37 
38     /**
39      * 接收消息的方法,采用消息队列监听机制.
40      * 
41      * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42      * 
43      * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44      * 
45      * @param msg
46      */
47     @RabbitHandler
48     public void consumer(String msg) {
49         // 打印消息
50         System.out.println("INFO消费者===>消费: " + msg);
51     }
52 
53 }

消费者启动主类如下所示:

代码语言:javascript
复制
 1 package com.example;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 @SpringBootApplication
 7 public class RabbitmqConsumerApplication {
 8 
 9     public static void main(String[] args) {
10         SpringApplication.run(RabbitmqConsumerApplication.class, args);
11     }
12 
13 }

5、发布订阅模式,生产者生产消息,消费者消费消息,运行效果如下所示:

首先启动你消费者消费消息的启动类,再启动你的生产者生产消息的启动类。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-11-03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档