专栏首页别先生RabbitMQ的交换器Exchange之Topic交换器(主题,规则匹配)

RabbitMQ的交换器Exchange之Topic交换器(主题,规则匹配)

1、Topic交换器(主题,规则匹配),Topic交换器也称为主题交换器,特点是根据规则进行匹配,可以根据模糊进行匹配(即根据路由key进行模糊匹配),决定将那个信息放入到指定的队列里面去。

项目的结构如下所示:

2、由于使用的是SpringBoot项目结合Maven项目构建的,pom.xml的配置文件,如下所示,生产者和消费者的配置文件一致,这里只贴一份了。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 5     https://maven.apache.org/xsd/maven-4.0.0.xsd">
 6     <modelVersion>4.0.0</modelVersion>
 7     <parent>
 8         <groupId>org.springframework.boot</groupId>
 9         <artifactId>spring-boot-starter-parent</artifactId>
10         <version>2.1.1.RELEASE</version>
11         <relativePath /> <!-- lookup parent from repository -->
12     </parent>
13     <groupId>com.bie</groupId>
14     <artifactId>rabbitmq-topic-provider</artifactId>
15     <version>0.0.1-SNAPSHOT</version>
16     <name>rabbitmq-topic-provider</name>
17     <description>Demo project for Spring Boot</description>
18 
19     <properties>
20         <java.version>1.8</java.version>
21     </properties>
22 
23     <dependencies>
24         <dependency>
25             <groupId>org.springframework.boot</groupId>
26             <artifactId>spring-boot-starter</artifactId>
27         </dependency>
28         <dependency>
29             <groupId>org.springframework.boot</groupId>
30             <artifactId>spring-boot-starter-web</artifactId>
31         </dependency>
32         <dependency>
33             <groupId>org.springframework.boot</groupId>
34             <artifactId>spring-boot-starter-test</artifactId>
35             <scope>test</scope>
36         </dependency>
37         <dependency>
38             <groupId>org.springframework.boot</groupId>
39             <artifactId>spring-boot-starter-amqp</artifactId>
40         </dependency>
41     </dependencies>
42 
43     <build>
44         <plugins>
45             <plugin>
46                 <groupId>org.springframework.boot</groupId>
47                 <artifactId>spring-boot-maven-plugin</artifactId>
48             </plugin>
49         </plugins>
50     </build>
51 
52 </project>

3、配置好pom.xml配置文件,就可以进行开发了,这里先约束一下配置文件,体现一下SpringBoot的魔力,约定大于配置。

 1 # 给当前项目起名称.
 2 spring.application.name=rabbitmq-topic-provider
 3 
 4 # 配置端口号
 5 server.port=8081
 6 
 7 # 配置rabbitmq的参数.
 8 # rabbitmq服务器的ip地址.
 9 spring.rabbitmq.host=192.168.110.133
10 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.
11 spring.rabbitmq.port=5672
12 # rabbitmq的账号.
13 spring.rabbitmq.username=guest
14 # rabbitmq的密码.
15 spring.rabbitmq.password=guest
16 
17 # 设置交换器的名称,方便修改.
18 # 生产者和消费者的交换器的名称是一致的,这样生产者生产的消息发送到交换器,消费者可以从这个交换器中消费.
19 rabbitmq.config.exchange=log.exchange.topic

模拟三个服务,用户服务、商品服务,订单服务,产生的各种日志信息,包含info、debug、trace、warn、error日志信息。不同的日志级别信息指定好路由键,将发送的消息绑定到交换器上面,发送消息。

 1 package com.example.bie.provider;
 2 
 3 import org.springframework.amqp.core.AmqpTemplate;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.beans.factory.annotation.Value;
 6 import org.springframework.stereotype.Component;
 7 
 8 /**
 9  * 
10  * @author biehl
11  * 
12  *         生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.
13  * 
14  *         这里使用的交换器类型使用的是topic主题模式,根据规则匹配。
15  *
16  */
17 @Component
18 public class RabbitMqUserLogProduce {
19 
20     @Autowired
21     private AmqpTemplate rabbitmqAmqpTemplate;
22 
23     // 交换器的名称Exchange
24     @Value(value = "${rabbitmq.config.exchange}")
25     private String exchange;
26 
27     // 路由键routingkey
28     private String routingKeyInfo = "user.log.info";
29     private String routingKeyDebug = "user.log.debug";
30     private String routingKeyTrace = "user.log.trace";
31     private String routingKeyWarn = "user.log.warn";
32     private String routingKeyError = "user.log.error";
33 
34     /**
35      * 发送消息的方法
36      * 
37      * @param msg
38      */
39     public void producer(String msg) {
40         // 向消息队列发送消息
41         // 参数1,交换器的名称
42         // 参数2,路由键
43         // 参数3,消息
44         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "user.log.info......" + msg);
45         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "user.log.debug......" + msg);
46         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "user.log.trace......" + msg);
47         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "user.log.warn......" + msg);
48         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "user.log.error......" + msg);
49     }
50 
51 }
 1 package com.example.bie.provider;
 2 
 3 import org.springframework.amqp.core.AmqpTemplate;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.beans.factory.annotation.Value;
 6 import org.springframework.stereotype.Component;
 7 
 8 /**
 9  * 
10  * @author biehl
11  * 
12  *         生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.
13  * 
14  *         这里使用的交换器类型使用的是topic主题模式,根据规则匹配。
15  *
16  */
17 @Component
18 public class RabbitMqProductLogProduce {
19 
20     @Autowired
21     private AmqpTemplate rabbitmqAmqpTemplate;
22 
23     // 交换器的名称Exchange
24     @Value(value = "${rabbitmq.config.exchange}")
25     private String exchange;
26 
27     // 路由键routingkey
28     private String routingKeyInfo = "product.log.info";
29     private String routingKeyDebug = "product.log.debug";
30     private String routingKeyTrace = "product.log.trace";
31     private String routingKeyWarn = "product.log.warn";
32     private String routingKeyError = "product.log.error";
33 
34     /**
35      * 发送消息的方法
36      * 
37      * @param msg
38      */
39     public void producer(String msg) {
40         // 向消息队列发送消息
41         // 参数1,交换器的名称
42         // 参数2,路由键
43         // 参数3,消息
44         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "product.log.info......" + msg);
45         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "product.log.debug......" + msg);
46         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "product.log.trace......" + msg);
47         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "product.log.warn......" + msg);
48         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "product.log.error......" + msg);
49     }
50 
51 }
 1 package com.example.bie.provider;
 2 
 3 import org.springframework.amqp.core.AmqpTemplate;
 4 import org.springframework.beans.factory.annotation.Autowired;
 5 import org.springframework.beans.factory.annotation.Value;
 6 import org.springframework.stereotype.Component;
 7 
 8 /**
 9  * 
10  * @author biehl
11  * 
12  *         生产者,生产消息同样需要知道向那个交换器Exchange发送消息的.
13  * 
14  *         这里使用的交换器类型使用的是topic主题模式,根据规则匹配。
15  *
16  */
17 @Component
18 public class RabbitMqOrderLogProduce {
19 
20     @Autowired
21     private AmqpTemplate rabbitmqAmqpTemplate;
22 
23     // 交换器的名称Exchange
24     @Value(value = "${rabbitmq.config.exchange}")
25     private String exchange;
26 
27     // 路由键routingkey
28     private String routingKeyInfo = "order.log.info";
29     private String routingKeyDebug = "order.log.debug";
30     private String routingKeyTrace = "order.log.trace";
31     private String routingKeyWarn = "order.log.warn";
32     private String routingKeyError = "order.log.error";
33 
34     /**
35      * 发送消息的方法
36      * 
37      * @param msg
38      */
39     public void producer(String msg) {
40         // 向消息队列发送消息
41         // 参数1,交换器的名称
42         // 参数2,路由键
43         // 参数3,消息
44         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyInfo, "order.log.info......" + msg);
45         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyDebug, "order.log.debug......" + msg);
46         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyTrace, "order.log.trace......" + msg);
47         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyWarn, "order.log.warn......" + msg);
48         this.rabbitmqAmqpTemplate.convertAndSend(this.exchange, this.routingKeyError, "order.log.error......" + msg);
49     }
50 
51 }

这里使用web工程,浏览器访问调用,方便测试。你也可以使用单元测试的方法。

 1 package com.example.bie.controller;
 2 
 3 import org.springframework.beans.factory.annotation.Autowired;
 4 import org.springframework.stereotype.Controller;
 5 import org.springframework.web.bind.annotation.RequestMapping;
 6 import org.springframework.web.bind.annotation.ResponseBody;
 7 
 8 import com.example.bie.provider.RabbitMqOrderLogProduce;
 9 import com.example.bie.provider.RabbitMqProductLogProduce;
10 import com.example.bie.provider.RabbitMqUserLogProduce;
11 
12 /**
13  * 
14  * @author biehl
15  *
16  */
17 @Controller
18 public class RabbitmqController {
19 
20     @Autowired
21     private RabbitMqUserLogProduce rabbitMqUserLogProduce;
22 
23     @Autowired
24     private RabbitMqProductLogProduce rabbitMqProductLogProduce;
25 
26     @Autowired
27     private RabbitMqOrderLogProduce rabbitMqOrderLogProduce;
28 
29     @RequestMapping(value = "/userLogInfo")
30     @ResponseBody
31     public String rabbitmqSendUserLogInfoMessage() {
32         String msg = "生产者===>生者的UserLogInfo消息message: ";
33         for (int i = 0; i < 50000; i++) {
34             rabbitMqUserLogProduce.producer(msg + i);
35         }
36         return "生产===>  UserLogInfo消息message  ===> success!!!";
37     }
38 
39     @RequestMapping(value = "/productLogInfo")
40     @ResponseBody
41     public String rabbitmqSendProductLogErrorMessage() {
42         String msg = "生产者===>生者的ProductLogInfo消息message: ";
43         for (int i = 0; i < 50000; i++) {
44             rabbitMqProductLogProduce.producer(msg + i);
45         }
46         return "生产===>  ProductLogInfo消息message  ===> success!!!";
47     }
48 
49     @RequestMapping(value = "/orderLogInfo")
50     @ResponseBody
51     public String rabbitmqSendOrderLogInfoMessage() {
52         String msg = "生产者===>生者的OrderLogInfo消息message: ";
53         for (int i = 0; i < 50000; i++) {
54             rabbitMqOrderLogProduce.producer(msg + i);
55         }
56         return "生产===>  OrderLogInfo消息message  ===> success!!!";
57     }
58 
59 }

生产者的启动类如下所示:

 1 package com.example;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 @SpringBootApplication
 7 public class RabbitmqProducerApplication {
 8 
 9     public static void main(String[] args) {
10         SpringApplication.run(RabbitmqProducerApplication.class, args);
11     }
12 
13 }

4、生产者开发完毕就可以进行消费者的开发,也是先约束一下配置文件application.properties。

 1 # 给当前项目起名称.
 2 spring.application.name=rabbitmq-topic-consumer
 3 
 4 # 配置端口号
 5 server.port=8080
 6 
 7 # 配置rabbitmq的参数.
 8 # rabbitmq服务器的ip地址.
 9 spring.rabbitmq.host=192.168.110.133
10 # rabbitmq的端口号5672,区别于浏览器访问界面的15672端口号.
11 spring.rabbitmq.port=5672
12 # rabbitmq的账号.
13 spring.rabbitmq.username=guest
14 # rabbitmq的密码.
15 spring.rabbitmq.password=guest
16 
17 # 设置交换器的名称,方便修改.
18 # 路由键是将交换器和队列进行绑定的,队列通过路由键绑定到交换器.
19 rabbitmq.config.exchange=log.exchange.topic
20 
21 # info级别的队列名称.
22 rabbitmq.config.queue.info=log.info.queue
23 
24 # error级别的队列名称.
25 rabbitmq.config.queue.error=log.error.queue
26 
27 # 全日志log级别的队列名称.
28 rabbitmq.config.queue.log=log.all.queue

约束好配置文件就可以进行消费者的开发了,这里是将用户服务、商品服务、订单服务产生的info、debug、trace、warn、error不同级别的日志信息,使用rabbitmq的主题topic模式进行规则配置,即,消费者可以专一消费info级别的消息,error级别的消息,或者全部级别的日志消息。

 1 package com.example.bie.consumer;
 2 
 3 import org.springframework.amqp.core.ExchangeTypes;
 4 import org.springframework.amqp.rabbit.annotation.Exchange;
 5 import org.springframework.amqp.rabbit.annotation.Queue;
 6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9 import org.springframework.stereotype.Component;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         消息接收者
16  * 
17  *         1、@RabbitListener bindings:绑定队列
18  * 
19  *         2、@QueueBinding
20  *         value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21  * 
22  *         3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23  * 
24  *         4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25  * 
26  * 
27  */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30 
31         value = @Queue(value = "${rabbitmq.config.queue.info}", autoDelete = "true"),
32 
33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC),
34 
35         key = "*.log.info"))
36 public class LogInfoConsumer {
37 
38     /**
39      * 接收消息的方法,采用消息队列监听机制.
40      * 
41      * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42      * 
43      * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44      * 
45      * @param msg
46      */
47     @RabbitHandler
48     public void consumer(String msg) {
49         // 打印消息
50         System.out.println("All消费者===>消费: " + msg);
51     }
52 
53 }
 1 package com.example.bie.consumer;
 2 
 3 import org.springframework.amqp.core.ExchangeTypes;
 4 import org.springframework.amqp.rabbit.annotation.Exchange;
 5 import org.springframework.amqp.rabbit.annotation.Queue;
 6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9 import org.springframework.stereotype.Component;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         消息接收者
16  * 
17  *         1、@RabbitListener bindings:绑定队列
18  * 
19  *         2、@QueueBinding
20  *         value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21  * 
22  *         3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23  * 
24  *         4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25  * 
26  * 
27  */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30 
31         value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),
32 
33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC),
34 
35         key = "*.log.error"))
36 public class LogErrorConsumer {
37 
38     /**
39      * 接收消息的方法,采用消息队列监听机制.
40      * 
41      * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42      * 
43      * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44      * 
45      * @param msg
46      */
47     @RabbitHandler
48     public void consumer(String msg) {
49         // 打印消息
50         System.out.println("Error消费者===>消费: " + msg);
51     }
52 
53 }
 1 package com.example.bie.consumer;
 2 
 3 import org.springframework.amqp.core.ExchangeTypes;
 4 import org.springframework.amqp.rabbit.annotation.Exchange;
 5 import org.springframework.amqp.rabbit.annotation.Queue;
 6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9 import org.springframework.stereotype.Component;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         消息接收者
16  * 
17  *         1、@RabbitListener bindings:绑定队列
18  * 
19  *         2、@QueueBinding
20  *         value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21  * 
22  *         3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23  * 
24  *         4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25  * 
26  * 
27  */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30 
31         value = @Queue(value = "${rabbitmq.config.queue.log}", autoDelete = "true"),
32 
33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.TOPIC),
34 
35         key = "*.log.*"))
36 public class LogAllConsumer {
37 
38     /**
39      * 接收消息的方法,采用消息队列监听机制.
40      * 
41      * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42      * 
43      * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44      * 
45      * @param msg
46      */
47     @RabbitHandler
48     public void consumer(String msg) {
49         // 打印消息
50         System.out.println("Info消费者===>消费: " + msg);
51     }
52 
53 }

消费者的启动类,如下所示:

 1 package com.example;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 @SpringBootApplication
 7 public class RabbitmqConsumerApplication {
 8 
 9     public static void main(String[] args) {
10         SpringApplication.run(RabbitmqConsumerApplication.class, args);
11     }
12 
13 }

5、运行效果如下所示:

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RabbitMQ的交换器Exchange之fanout交换器(广播)

    1、Fanout交换器(广播),以广播的模式进行消息的传递。广播模式一定没有路由键的存在,将消息从路由器发送到所有绑定的队列中去(即消息会发送到所有和指定路由器...

    别先生
  • RabbitMQ的交换器Exchange之direct(发布与订阅 完全匹配)

    1、交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。三种常用的交换器类型,a、direct(发布与订阅 完全匹配)。b、fanout(广播)。...

    别先生
  • RabbitMQ与Spring的框架整合之Spring AMQP实战

      RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可。注意,autoStartup必须设置为true,否则Spring容...

    别先生
  • 传统Spring项目使用FeignClient组件访问微服务

    传统Spring项目使用 这里的传统 Spring项目指的是没有使用 spring boot的 spring项目,例如 ssm api 文件 和在spring ...

    双鬼带单
  • springboot的Web开发-Web相关配置

    一:Spring Boot提供自动配置        通过查看WebMvcAutoConfiguration及WebMvcProperties的源码,可以发现S...

    庞小明
  • 小程序性能优化总结

    在小程序启动时,微信会在背后完成几项工作:下载小程序代码包、加载小程序代码包、初始化小程序首页。 初始化小程序环境是微信环境做的工作,我们只需要控制代码包大小,...

    张炳
  • Swagger-Springboot-mybatis-mysql

    此项目是我闲着的时候,整理出来的。因为我主要是做后端的,接口可能测试比较麻烦,所以就想起来了使用Swagger来管理API,然后还起到了测试的作用,还省得费劲去...

    TrueDei
  • Springboot - Async 异步任务

    用户5927264
  • springboot任务之异步任务

    此时我们启动服务器,并输出localhost:8080/hello,会在3s之后响应的success。

    绝命生
  • springBoot学习(三)springBoot事件监听和部分注解的运用

    1.springBoot启动类会使用@SpringBootApplication 2.点进入源代码发现改注解是一个复合注解,由好几个注解共同组合而成

    杨小杰

扫码关注云+社区

领取腾讯云代金券