第四十三章: 基于SpringBoot & RabbitMQ完成TopicExchange分布式消息消费

我们在之前的两个章节第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费第四十二章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息多消费者消费提高了RabbitMQ消息队列的DirectExchange交换类型的消息消费,我们之前的章节提到了RabbitMQ比较常用的交换类型有三种,我们今天来看看TopicExchange主题交换类型。

本章目标

基于SpringBoot平台完成RabbitMQTopicExchange消息类型交换。

SpringBoot 企业级核心技术学习专题

专题

专题名称

专题描述

001

Spring Boot 核心技术

讲解SpringBoot一些企业级层面的核心组件

002

Spring Boot 核心技术章节源码

Spring Boot 核心技术简书每一篇文章码云对应源码

003

Spring Cloud 核心技术

对Spring Cloud核心技术全面讲解

004

Spring Cloud 核心技术章节源码

Spring Cloud 核心技术简书每一篇文章对应源码

005

QueryDSL 核心技术

全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA

006

SpringDataJPA 核心技术

全面讲解SpringDataJPA核心技术

解决问题

之前少年也遇到了一个问题,分类了多模块后消息队列无法自动创建,说来也好笑,之前没有时间去看这个问题,今天在编写本章文章时发现原因竟然是SpringBoot没有扫描到common模块内的配置类。让我一阵的头大~~~,我们在XxxApplication启动类上添加@ComponentScan(value = "com.hengyu.rabbitmq")就可以自动创建队列了!!!

构建项目

本章构建项目时同样采用多模块的方式进行设计,可以很好的看到消息处理的效果,因为是多模块项目,我们先来创建一个SpringBoot项目,pom.xml配置文件依赖配置如下所示:

<dependencies>
        <!--rabbbitMQ相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--web相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--lombok依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--spring boot tester-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--fast json依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.40</version>
        </dependency>
    </dependencies>

下面我们先来构建公共RabbitMQ模块,因为我们的消费者以及生产者都是需要RabbitMQ相关的配置信息,这里我们可以提取出来,使用时进行模块之间的引用。

rabbitmq-topic-common

创建子模块rabbitmq-topic-common,在resources下添加application.yml配置文件并配置RabbitMQ相关的依赖配置,如下所示:

spring:
  #rabbitmq消息队列配置信息
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirms: true
定义交换配置信息

我们跟之前的章节一张,独立编写一个枚举类型来配置消息队列的交换信息,如下所示:

/**
 * rabbitmq交换配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:13:56
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum ExchangeEnum
{
    /**
     * 用户注册交换配置枚举
     */
    USER_REGISTER_TOPIC_EXCHANGE("register.topic.exchange")
    ;
    private String name;

    ExchangeEnum(String name) {
        this.name = name;
    }
}
定义队列配置信息

同样消息队列的基本信息配置也同样采用枚举的形式配置,如下所示:

/**
 * 队列配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:05
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum QueueEnum
{
    /**
     * 用户注册
     * 创建账户消息队列
     */
    USER_REGISTER_CREATE_ACCOUNT("register.account","register.#"),
    /**
     * 用户注册
     * 发送注册成功邮件消息队列
     */
    USER_REGISTER_SEND_MAIL("register.mail","register.#")
    ;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 队列路由键
     */
    private String routingKey;

    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}

消息队列枚举内添加了两个属性,分别对应了队列名称队列路由,我们本章所讲解的TopicExchange类型消息队列可以根据路径信息配置多个消息消费者,而转发的匹配规则信息则是我们定义的队列的路由信息。

定义发送消息路由信息

我们在发送消息到队列时,需要我们传递一个路由相关的配置信息,RabbitMQ会根据发送时的消息路由规则信息与定义消息队列时的路由信息进行匹配,如果可以匹配则调用该队列的消费者完成消息的消费,发送消息路由信息配置如下所示:

/**
 * 消息队列topic交换路由key配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:21:58
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum TopicEnum {
    /**
     * 用户注册topic路由key配置
     */
    USER_REGISTER("register.user")
    ;

    private String topicRouteKey;

    TopicEnum(String topicRouteKey) {
        this.topicRouteKey = topicRouteKey;
    }
}
路由特殊字符 #

我们在QueueEnum内配置的路由键时有个特殊的符号:#,在RabbitMQ消息队列内路由配置#时表示可以匹配零个或多个字符,我们TopicEnum枚举内定义的register.user,则是可以匹配QueueEnum枚举定义register.#队列的路由规则。 当然发送消息时如果路由传递:register.user.account也是可以同样匹配register.#的路由规则。

路由特殊字符 *

除此之外比较常用到的特殊字符还有一个*,在RabbitMQ消息队列内路由配置*时表示可以匹配一个字符,我们QueueEnum定义路由键如果修改成register.*时,发送消息时路由为register.user则是可以接受到消息的。但如果发送时的路由为register.user.account时,则是无法匹配该消息。

消息队列配置

配置准备工作已经做好,接下来我们开始配置队列相关的内容,跟之前一样我们需要配置QueueExchangeBinding将消息队列与交换绑定。下面我们来看看配置跟之前的章节有什么差异的地方,代码如下所示:

/**
 * 用户注册消息队列配置
 * ========================
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:16:58
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Configuration
public class UserRegisterQueueConfiguration {

    private Logger logger = LoggerFactory.getLogger(UserRegisterQueueConfiguration.class);
    /**
     * 配置用户注册主题交换
     * @return
     */
    @Bean
    public TopicExchange userTopicExchange()
    {
        TopicExchange topicExchange = new TopicExchange(ExchangeEnum.USER_REGISTER_TOPIC_EXCHANGE.getName());
        logger.info("用户注册交换实例化成功。");
        return topicExchange;
    }

    /**
     * 配置用户注册
     * 发送激活邮件消息队列
     * 并设置持久化队列
     * @return
     */
    @Bean
    public Queue sendRegisterMailQueue()
    {
        Queue queue = new Queue(QueueEnum.USER_REGISTER_SEND_MAIL.getName());
        logger.info("创建用户注册消息队列成功");
        return queue;
    }

    /**
     * 配置用户注册
     * 创建账户消息队列
     * 并设置持久化队列
     * @return
     */
    @Bean
    public Queue createAccountQueue()
    {
        Queue queue = new Queue(QueueEnum.USER_REGISTER_CREATE_ACCOUNT.getName());
        logger.info("创建用户注册账号队列成功.");
        return queue;
    }

    /**
     * 绑定用户发送注册激活邮件队列到用户注册主题交换配置
     * @return
     */
    @Bean
    public Binding sendMailBinding(TopicExchange userTopicExchange,Queue sendRegisterMailQueue)
    {
        Binding binding = BindingBuilder.bind(sendRegisterMailQueue).to(userTopicExchange).with(QueueEnum.USER_REGISTER_SEND_MAIL.getRoutingKey());
        logger.info("绑定发送邮件到注册交换成功");
        return binding;
    }

    /**
     * 绑定用户创建账户到用户注册主题交换配置
     * @return
     */
    @Bean
    public Binding createAccountBinding(TopicExchange userTopicExchange,Queue createAccountQueue)
    {
        Binding binding = BindingBuilder.bind(createAccountQueue).to(userTopicExchange).with(QueueEnum.USER_REGISTER_CREATE_ACCOUNT.getRoutingKey());
        logger.info("绑定创建账号到注册交换成功。");
        return binding;
    }
}

我们从上面开始分析。 第一步: 首先我们创建了TopicExchange消息队列对象,使用ExchangeEnum枚举内的USER_REGISTER_TOPIC_EXCHANGE类型作为交换名称。

第二步:我们创建了发送注册邮件的队列sendRegisterMailQueue,使用QueueEnum枚举内的类型USER_REGISTER_SEND_MAIL作为队列名称。

第三步:与发送邮件队列一致,用户创建完成后需要初始化账户信息,而createAccountQueue消息队列后续逻辑就是来完成该工作,使用QueueEnum枚举内的USER_REGISTER_CREATE_ACCOUNT枚举作为创建账户队列名称。

第四步:在上面步骤中已经将交换、队列创建完成,下面就开始将队列绑定到用户注册交换,从而实现注册用户消息队列消息消费,sendMailBinding绑定了QueueEnum.USER_REGISTER_SEND_MAILRoutingKey配置信息。

createAccountBinding绑定了QueueEnum.USER_REGISTER_CREATE_ACCOUNTRoutingKey配置信息。

到目前为止我们完成了rabbitmq-topic-common模块的所有配置信息,下面我们开始编写用户注册消息消费者模块。

rabbitmq-topic-consumer

我们首先来创建一个子模块命名为rabbitmq-topic-consumer,在pom.xml配置文件内添加rabbitmq-topic-common模块的引用,如下所示:

....//
<dependencies>
        <!--公共模块依赖-->
        <dependency>
            <groupId>com.hengyu</groupId>
            <artifactId>rabbitmq-topic-common</artifactId>
            <version>${parent.version}</version>
        </dependency>
    </dependencies>
....//
消费者程序入口

下面我们来创建程序启动类RabbitMqTopicConsumerApplication,在这里需要注意,手动配置下扫描路径@ComponentScan,启动类代码如下所示:

/**
 * 消息消费者程序启动入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:21:48
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
@ComponentScan(value = "com.hengyu.rabbitmq")
public class RabbitMqTopicConsumerApplication {

    /**
     * logback
     */
    private static Logger logger = LoggerFactory.getLogger(RabbitMqTopicConsumerApplication.class);

    /**
     * 程序入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitMqTopicConsumerApplication.class,args);

        logger.info("【【【【【Topic队列消息Consumer启动成功】】】】】");
    }
}

手动配置扫描路径在文章的开始解释过了,主要目的是为了扫描到RabbitMQConfiguration配置类内的信息,让RabbitAdmin自动创建配置信息到server端。

发送邮件消费者

发送邮件消息费监听register.mail消息队列信息,如下所示:

/**
 * 发送用户注册成功邮件消费者
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:07
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "register.mail")
public class SendMailConsumer
{

    /**
     * logback
     */
    Logger logger = LoggerFactory.getLogger(SendMailConsumer.class);

    /**
     * 处理消息
     * 发送用户注册成功邮件
     * @param userId 用户编号
     */
    @RabbitHandler
    public void handler(String userId)
    {

        logger.info("用户:{},注册成功,自动发送注册成功邮件.",userId);

        //... 发送注册成功邮件逻辑
    }
}

在这里我只是完成了消息的监听,具体的业务逻辑可以根据需求进行处理。

创建账户消费者

创建用户账户信息消费者监听队列register.account,代码如下所示:

/**
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:04
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "register.account")
public class CreateAccountConsumer {

    /**
     * logback
     */
    Logger logger = LoggerFactory.getLogger(CreateAccountConsumer.class);

    /**
     * 处理消息
     * 创建用户账户
     * @param userId 用户编号
     */
    @RabbitHandler
    public void handler(String userId)
    {
        logger.info("用户:{},注册成功,自动创建账户信息.",userId);

        //... 创建账户逻辑
    }
}

创建账户,账户初始化逻辑都可以在handler方法进行处理,本章没有做数据库复杂的处理,所以没有过多的逻辑处理在消费者业务内。

rabbitmq-topic-provider

接下来是我们的消息提供者的模块编写,我们依然先来创建程序入口类,并添加扫描配置@ComponentScan路径,代码如下所示:

/**
 * 消息生产者程序启动入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:21:48
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
@ComponentScan(value = "com.hengyu.rabbitmq")
public class RabbitMqTopicProviderApplication {

    /**
     * logback
     */
    private static Logger logger = LoggerFactory.getLogger(RabbitMqTopicProviderApplication.class);

    /**
     * 程序入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitMqTopicProviderApplication.class,args);

        logger.info("【【【【【Topic队列消息Provider启动成功】】】】】");
    }
}
定义消息发送接口

创建QueueMessageService队列消息发送接口并添加send方法,如下所示:

/**
 * 消息队列业务
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:50
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
public interface QueueMessageService
{
    /**
     * 发送消息到rabbitmq消息队列
     * @param message 消息内容
     * @param exchangeEnum 交换配置枚举
     * @param routingKey 路由key
     * @throws Exception
     */
    public void send(Object message, ExchangeEnum exchangeEnum, String routingKey) throws Exception;
}

send方法内有三个参数,解析如下:

  • message:发送消息内容,可以为任意类型,当然本章内仅仅是java.lang.String。
  • exchangeEnum:我们自定义的交换枚举类型,方便发送消息到指定交换。
  • routingKey:发送消息时的路由键内容,该值采用TopicEnum枚举内的topicRouteKey作为参数值。

下面我们来看看该接口的实现类QueueMessageServiceSupportsend方法实现,如下所示:

/**
 * 消息队列业务逻辑实现
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:52
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
public class QueueMessageServiceSupport
    implements QueueMessageService
{
    /**
     * 消息队列模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(Object message, ExchangeEnum exchangeEnum, String routingKey) throws Exception {
        //发送消息到消息队列
        rabbitTemplate.convertAndSend(exchangeEnum.getName(),routingKey,message);
    }
}

我们通过RabbitTemplate实例的convertAndSend方法将对象类型转换成JSON字符串后发送到消息队列服务端,RabbitMQ接受到消息后根据注册的消费者并且路由规则筛选后进行消息转发,并实现消息的消费。

运行测试

为了方便测试我们创建一个名为UserService的实现类,如下所示:

/**
 * 用户业务逻辑
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:10
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Service
public class UserService
{
    /**
     * 消息队列发送业务逻辑
     */
    @Autowired
    private QueueMessageService queueMessageService;

    /**
     * 随机创建用户
     * 随机生成用户uuid编号,发送到消息队列服务端
     * @return
     * @throws Exception
     */
    public String randomCreateUser() throws Exception
    {
        //用户编号
        String userId = UUID.randomUUID().toString();
        //发送消息到rabbitmq服务端
        queueMessageService.send(userId, ExchangeEnum.USER_REGISTER_TOPIC_EXCHANGE, TopicEnum.USER_REGISTER.getTopicRouteKey());
        return userId;
    }
}

该类内添加了一个名为randomCreateUser随机创建用户的方法,通过UUID随机生成字符串作为用户的编号进行传递给用户注册消息队列,完成用户的模拟创建。

编写测试用例

接下来我们创建RabbitMqTester测试类来完成随机用户创建消息发送,测试用例完成简单的UserService注入,并调用randomCreateUser方法,如下所示:

/**
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:10
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqTopicProviderApplication.class)
public class RabbitMqTester
{
    /**
     * 用户业务逻辑
     */
    @Autowired
   private UserService userService;

    /**
     * 模拟随机创建用户 & 发送消息到注册用户消息队列
     * @throws Exception
     */
    @Test
    public void testTopicMessage() throws Exception
    {
        userService.randomCreateUser();
    }
}

到目前为止,我们的编码已经完成,下面我们按照下面的步骤启动测试:

  1. 启动rabbitmq-topic-consumer消息消费者模块,并查看控制台输出内容是否正常
  2. 运行rabbitmq-topic-provider模块测试用例方法testTopicMessage
  3. 查看rabbitmq-topic-consumer控制台输出内容

最终效果:

2017-12-30 18:39:16.819  INFO 2781 --- [           main] c.h.r.c.RabbitMqTopicConsumerApplication : 【【【【【Topic队列消息Consumer启动成功】】】】】
2017-12-30 18:39:29.376  INFO 2781 --- [cTaskExecutor-1] c.h.r.consumer.CreateAccountConsumer     : 用户:c6ef682d-da2e-4cac-a004-c244ff4c4503,注册成功,自动创建账户信息.
2017-12-30 18:39:29.376  INFO 2781 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.SendMailConsumer   : 用户:c6ef682d-da2e-4cac-a004-c244ff4c4503,注册成功,自动发送注册成功邮件.

总结

本章主要讲解了TopicExchange交换类型如何消费队列消息,讲解了常用到了的特殊字符#*如何匹配,解决了多模块下的队列配置信息无法自动创建问题。还有一点需要注意TopicExchange交换类型在消息消费时不存在固定的先后顺序!!!

本章源码已经上传到码云: SpringBoot配套源码地址:https://gitee.com/hengboy/spring-boot-chapter SpringCloud配套源码地址:https://gitee.com/hengboy/spring-cloud-chapter SpringBoot相关系列文章请访问:目录:SpringBoot学习目录 QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录 SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录,感谢阅读!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏散尽浮华

Centos7.2下针对LDAP的完整部署记录

一、LDAP究竟是什么? LDAP是轻量目录访问协议,英文全称是Lightweight Directory Access Protocol,一般都简称为LDAP...

4.6K12
来自专栏Java架构师历程

ssh工作原理

1.通过Configuration().configure();读取并解析hibernate.cfg.xml配置文件

1173
来自专栏owent

集成Qt Webkit 到cocos2d-x

近期倒腾下客户端,想搞个cocos2d的工具。 之前的那个集成到Win32工具下的调试辅助工具是直接用的windows api。拓展起来巨麻烦。而且Windo...

1012
来自专栏葡萄城控件技术团队

Web API 持续集成:PostMan+Newman+Jenkins(图文讲解)

上篇文章我们已经完成了API测试工具选型,接下来是一系列周期性的开发测试过程:接口开发、检出代码、运行测试、记录结果、发送报告。为了快速发现问题,并减少重复过程...

1222
来自专栏C/C++基础

C++实现简易log日志系统

在软件开发周期中,不管是前台还是后台,系统一般会采用一个持久化的日志系统来记录运行情况。

3532
来自专栏IMWeb前端团队

pm2模块编写入门

本文作者:IMWeb zzbozheng 原文出处:IMWeb社区 未经同意,禁止转载 PM2 模块 PM2模块是通过PM2来安装和管理,代码可以托管...

2086
来自专栏java一日一条

Java Spring中同时访问多种不同数据库

开发企业应用时我们常常遇到要同时访问多种不同数据库的问题,有时是必须把数据归档到某种数据仓库中,有时是要把数据变更推送到第三方数据库中。使用Spring框架时,...

1561
来自专栏JetpropelledSnake

RESTful源码学习笔记之RPC和Restful深入理解

RPC 即远程过程调用(Remote Procedure Call Protocol,简称RPC),像调用本地服务(方法)一样调用服务器的服务(方法)。通常的实...

833
来自专栏Java技术栈

Jodd - Java界的瑞士军刀轻量级工具包!

2292
来自专栏生信宝典

生信人写程序2. Editplus添加Perl, Shell, R, markdown模板和语法高亮

前言 “工欲善其事必先利其器”,生信工程师每天写代码、搭流程,而且要使用至少三门编程语言,没有个好集成开发环境(IDE,Integrated Developme...

2968

扫码关注云+社区