第四十六章:SpringBoot & RabbitMQ完成消息延迟消费

2018-3-1SpringBoot官方发版了2.0.0.RELEASE最新版本,新版本完全基于Spring5.0来构建,JDK最低支持也从原来的1.6也改成了1.8,不再兼容1.8以下的版本,更多新特性请查看官方文档

本章目标

基于SpringBoot整合RabbitMQ完成消息延迟消费。

构建项目

注意前言

由于SpringBoot的内置扫描机制,我们如果不自动配置扫描路径,请保持下面rabbitmq-common模块内的配置可以被SpringBoot扫描到,否则不会自动创建队列,控制台会输出404的错误信息。

SpringBoot 企业级核心技术学习专题

专题

专题名称

专题描述

001

Spring Boot 核心技术

讲解SpringBoot一些企业级层面的核心组件

002

Spring Boot 核心技术章节源码

Spring Boot 核心技术简书每一篇文章码云对应源码

003

Spring Cloud 核心技术

对Spring Cloud核心技术全面讲解

004

Spring Cloud 核心技术章节源码

Spring Cloud 核心技术简书每一篇文章对应源码

005

QueryDSL 核心技术

全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA

006

SpringDataJPA 核心技术

全面讲解SpringDataJPA核心技术

我们本章采用2.0.0.RELEASE版本的SpringBoot,添加相关的依赖如下所示:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
</parent>
......
<dependencies>
        <!--rabbbitMQ相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--web相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--lombok依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--spring boot tester-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--fast json依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.40</version>
        </dependency>
    </dependencies>
......

我们仍然采用多模块的方式来测试队列的Provider以及Consumer

队列公共模块

我们先来创建一个名为rabbitmq-common公共依赖模块(Create New Maven Module) 在公共模块内添加一个QueueEnum队列枚举配置,该枚举内配置队列的ExchangeQueueNameRouteKey等相关内容,如下所示:

package com.hengyu.rabbitmq.lazy.enums;

import lombok.Getter;

/**
 * 消息队列枚举配置
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:33
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Getter
public enum QueueEnum {
    /**
     * 消息通知队列
     */
    MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),
    /**
     * 消息通知ttl队列
     */
    MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");
    /**
     * 交换名称
     */
    private String exchange;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 路由键
     */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }
}

可以看到MESSAGE_QUEUE队列配置跟我们之前章节的配置一样,而我们另外新创建了一个后缀为ttl的消息队列配置。我们采用的这种方式是RabbitMQ消息队列其中一种的延迟消费模块,通过配置队列消息过期后转发的形式。

这种模式比较简单,我们需要将消息先发送到ttl延迟队列内,当消息到达过期时间后会自动转发到ttl队列内配置的转发Exchange以及RouteKey绑定的队列内完成消息消费。

下面我们来模拟消息通知的延迟消费场景,先来创建一个名为MessageRabbitMqConfiguration的队列配置类,该配置类内添加消息通知队列配置以及消息通过延迟队列配置,如下所示:

/**
 * 消息通知 - 消息队列配置信息
 *
 * @author:恒宇少年 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:32
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Configuration
public class MessageRabbitMqConfiguration {
    /**
     * 消息中心实际消费队列交换配置
     *
     * @return
     */
    @Bean
    DirectExchange messageDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心延迟消费交换配置
     *
     * @return
     */
    @Bean
    DirectExchange messageTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心实际消费队列配置
     *
     * @return
     */
    @Bean
    public Queue messageQueue() {
        return new Queue(QueueEnum.MESSAGE_QUEUE.getName());
    }


    /**
     * 消息中心TTL队列
     *
     * @return
     */
    @Bean
    Queue messageTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())
                // 配置到期后转发的交换
                .withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())
                // 配置到期后转发的路由键
                .withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey())
                .build();
    }

    /**
     * 消息中心实际消息交换与队列绑定
     *
     * @param messageDirect 消息中心交换配置
     * @param messageQueue  消息中心队列
     * @return
     */
    @Bean
    Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) {
        return BindingBuilder
                .bind(messageQueue)
                .to(messageDirect)
                .with(QueueEnum.MESSAGE_QUEUE.getRouteKey());
    }

    /**
     * 消息中心TTL绑定实际消息中心实际消费交换机
     *
     * @param messageTtlQueue
     * @param messageTtlDirect
     * @return
     */
    @Bean
    public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {
        return BindingBuilder
                .bind(messageTtlQueue)
                .to(messageTtlDirect)
                .with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());
    }
}

我们声明了消息通知队列的相关ExchangeQueueBinding等配置,将message.center.create队列通过路由键message.center.create绑定到了message.center.direct交换上。

除此之外,我们还添加了消息通知延迟队列ExchangeQueueBinding等配置,将message.center.create.ttl队列通过message.center.create.ttl路由键绑定到了message.center.topic.ttl交换上。

我们仔细来看看messageTtlQueue延迟队列的配置,跟messageQueue队列配置不同的地方这里多出了x-dead-letter-exchangex-dead-letter-routing-key两个参数,而这两个参数就是配置延迟队列过期后转发的ExchangeRouteKey,只要在创建队列时对应添加了这两个参数,在RabbitMQ管理平台看到的队列配置就不仅是单纯的Direct类型的队列类型,如下图所示:

队列类型差异

在上图内我们可以看到message.center.create.ttl队列多出了DLXDLK的配置,这就是RabbitMQ死信交换的标志。 满足死信交换的条件,在官方文档中表示:

Messages from a queue can be 'dead-lettered'; that is, republished to another exchange when any of the following events occur:

The message is rejected (basic.reject or basic.nack) with requeue=false, The TTL for the message expires; or The queue length limit is exceeded.

  • 该消息被拒绝(basic.reject或 basic.nack),requeue = false
  • 消息的TTL过期
  • 队列长度限制已超出 官方文档地址

我们需要满足上面的其中一种方式就可以了,我们采用满足第二个条件,采用过期的方式。

队列消息提供者

我们再来创建一个名为rabbitmq-lazy-provider的模块(Create New Maven Module),并且在pom.xml配置文件内添加rabbitmq-common模块的依赖,如下所示:

<!--添加公共模块依赖-->
<dependency>
      <groupId>com.hengyu</groupId>
      <artifactId>rabbitmq-common</artifactId>
      <version>0.0.1-SNAPSHOT</version>
</dependency>

配置队列

resource下创建一个名为application.yml的配置文件,在该配置文件内添加如下配置信息:

spring:
  #rabbitmq消息队列配置信息
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /hengboy
    publisher-confirms: true

消息提供者类

接下来我们来创建名为MessageProvider消息提供者类,用来发送消息内容到消息通知延迟队列,代码如下所示:

/**
 * 消息通知 - 提供者
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:40
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Component
public class MessageProvider {
    /**
     * logger instance
     */
    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
    /**
     * RabbitMQ 模版消息实现类
     */
    @Autowired
    private AmqpTemplate rabbitMqTemplate;

    /**
     * 发送延迟消息
     *
     * @param messageContent 消息内容
     * @param exchange       队列交换
     * @param routerKey      队列交换绑定的路由键
     * @param delayTimes     延迟时长,单位:毫秒
     */
    public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) {
        if (!StringUtils.isEmpty(exchange)) {
            logger.info("延迟:{}毫秒写入消息队列:{},消息内容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));
            // 执行发送消息到指定队列
            rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {
                // 设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            });
        } else {
            logger.error("未找到队列消息:{},所属的交换机", exchange);
        }
    }
}

由于我们在 pom.xml配置文件内添加了RabbitMQ相关的依赖并且在上面application.yml文件内添加了对应的配置,SpringBoot为我们自动实例化了AmqpTemplate,该实例可以发送任何类型的消息到指定队列。 我们采用convertAndSend方法,将消息内容发送到指定ExchangeRouterKey队列,并且通过setExpiration方法设置过期时间,单位:毫秒。

编写发送测试

我们在test目录下创建一个测试类,如下所示:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqLazyProviderApplication.class)
public class RabbitMqLazyProviderApplicationTests {
    /**
     * 消息队列提供者
     */
    @Autowired
    private MessageProvider messageProvider;

    /**
     * 测试延迟消息消费
     */
    @Test
    public void testLazy() {
        // 测试延迟10秒
        messageProvider.sendMessage("测试延迟消费,写入时间:" + new Date(),
                QueueEnum.MESSAGE_TTL_QUEUE.getExchange(),
                QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(),
                10000);
    }
}

注意:@SpringBootTest注解内添加了classes入口类的配置,因为我们是模块创建的项目并不是默认创建的SpringBoot项目,这里需要配置入口程序类才可以运行测试。

在测试类我们注入了MessageProvider消息提供者,调用sendMessage方法发送消息到消息通知延迟队列,并且设置延迟的时间为10秒,这里衡量发送到指定队列的标准是要看MessageRabbitMqConfiguration配置类内的相关Binding配置,通过ExchangeRouterKey值进行发送到指定的队列。

到目前为止我们的rabbitmq-lazy-provider消息提供模块已经编写完成了,下面我们来看看消息消费者模块。

队列消息消费者

我们再来创建一个名为rabbitmq-lazy-consumer的模块(Create New Maven Module),同样需要在pom.xml配置文件内添加rabbitmq-common模块的依赖,如下所示:

<!--添加公共模块依赖-->
<dependency>
      <groupId>com.hengyu</groupId>
      <artifactId>rabbitmq-common</artifactId>
      <version>0.0.1-SNAPSHOT</version>
</dependency>

当然同样需要在resource下创建application.yml并添加消息队列的相关配置,代码就不贴出来了,可以直接从rabbitmq-lazy-provider模块中复制application.yml文件到当前模块内。

消息消费者类

接下来创建一个名为MessageConsumer的消费者类,该类需要监听消息通知队列,代码如下所示:

/**
 * 消息通知 - 消费者
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午5:00
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Component
@RabbitListener(queues = "message.center.create")
public class MessageConsumer {
    /**
     * logger instance
     */
    static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

    @RabbitHandler
    public void handler(String content) {
        logger.info("消费内容:{}", content);
    }
}

@RabbitListener注解内配置了监听的队列,这里配置内容是QueueEnum枚举内的queueName属性值,当然如果你采用常量的方式在注解属性上是直接可以使用的,枚举不支持这种配置,这里只能把QueueName字符串配置到queues属性上了。 由于我们在消息发送时采用字符串的形式发送消息内容,这里在@RabbitHandler处理方法的参数内要保持数据类型一致!

消费者入口类

我们为消费者模块添加一个入口程序类,用于启动消费者,代码如下所示:

/**
 * 【第四十六章:SpringBoot & RabbitMQ完成消息延迟消费】
 * 队列消费者模块 - 入口程序类
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:55
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@SpringBootApplication
public class RabbitMqLazyConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqLazyConsumerApplication.class, args);
    }
}

测试

我们的代码已经编写完毕,下面来测试下是否完成了我们预想的效果,步骤如下所示:

1. 启动消费者模块
2. 执行RabbitMqLazyProviderApplicationTests.testLazy()方法进行发送消息到通知延迟队列
3. 查看消费者模块控制台输出内容

我们可以在消费者模块控制台看到输出内容:

2018-03-04 10:10:34.765  INFO 70486 --- [cTaskExecutor-1] c.h.r.lazy.consumer.MessageConsumer      : 消费内容:测试延迟消费,写入时间:Sun Mar 04 10:10:24 CST 2018

我们在提供者测试方法发送消息的时间为10:10:24,而真正消费的时间则为10:10:34,与我们预计的一样,消息延迟了10秒后去执行消费。

总结

终上所述我们完成了消息队列的延迟消费,采用死信方式,通过消息过期方式触发,在实际项目研发过程中,延迟消费还是很有必要的,可以省去一些定时任务的配置。

本章源码已经上传到码云: SpringBoot配套源码地址:https://gitee.com/hengboy/spring-boot-chapter SpringCloud配套源码地址:https://gitee.com/hengboy/spring-cloud-chapter SpringBoot相关系列文章请访问:目录:SpringBoot学习目录 QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录 SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录,感谢阅读!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏c#开发者

DataGrid和CheckBox的混合使用

我们知道DataGrid是非常强大的一个ASP.NET组件,我们可以用它表示非常丰富的信息.在论坛里经常可以看见一些网友问一些关于该控件的问题,我虽不是什么高手...

3369
来自专栏熊二哥

SpringBoot详细研究-03系统集成

据说杰克船长被黑客盗片了,看来信息安全依然任重而道远,本文以此为引子,来介绍下spring boot对于系统集成方面的支持。 ? ? Spring Secur...

4166
来自专栏架构师之旅

《Spring敲门砖之基础教程第一季》 第一章(2)解读Spring Framework

系统架构 一个成功的项目离不开一个好的架构,一个好的架构自然需要一位好的设计师, Rod Johnson正是Spring的前生总架构设计师,那...

1736
来自专栏阿杜的世界

Spring AOP的最佳实践一、异常处理二、安全检查三、缓存

抛开业界对checked exception和unchecked exception的论战不谈,重点看着两类异常的应用场景:

723
来自专栏JAVA高级架构

一个高性能、轻量级的分布式内存队列系统--beanstalk

Beanstalk是一个高性能、轻量级的、分布式的、内存型的消息队列系统。最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟。...

3449
来自专栏Android干货

AsyncTask.cancel()的结束问题

2755
来自专栏java一日一条

Spring Boot + Mybatis + Redis二级缓存开发指南

Spring-Boot因其提供了各种开箱即用的插件,使得它成为了当今最为主流的Java Web开发框架之一。Mybatis是一个十分轻量好用的ORM框架。Red...

813
来自专栏kl的专栏

Apollo应用之动态调整线上数据源(DataSource)

博主之前写过使用apollo的配置动态推送能力来动态修改线上环境的日志输出级别,具体可见《spring boot动态调整线上日志级别》,今天来实现一个类似的应用...

3987
来自专栏LanceToBigData

Spring(一)Spring的第一滴血

前言   开始工作了,但是一进来公司本来是做爬虫和数据分析的,但是走了一个后端的,导致我必须要去顶替他的工作。因为这个项目使用的是Spring、   Sprin...

2146
来自专栏JavaWeb

Web服务器加速之Tomcat7性能如何调优

3836

扫码关注云+社区