前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >第四十二章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息多消费者消费

第四十二章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息多消费者消费

作者头像
恒宇少年
发布2018-06-27 16:07:04
6760
发布2018-06-27 16:07:04
举报

在上一章第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费我们讲解到了RabbitMQ消息队列的DirectExchange路由键消息单个消费者消费,源码请访问SpringBoot对应章节源码下载查看,消息队列目的是完成消息的分布式消费,那么我们是否可以为一个Provider创建并绑定多个Consumer呢?

本章目标

基于SpringBoot平台整合RabbitMQ消息队列,完成一个Provider绑定多个Consumer进行消息消费。

SpringBoot 企业级核心技术学习专题

专题

专题名称

专题描述

001

Spring Boot 核心技术

讲解SpringBoot一些企业级层面的核心组件

002

Spring Boot 核心技术章节源码

Spring Boot 核心技术简书每一篇文章码云对应源码

003

Spring Cloud 核心技术

对Spring Cloud核心技术全面讲解

004

Spring Cloud 核心技术章节源码

Spring Cloud 核心技术简书每一篇文章对应源码

005

QueryDSL 核心技术

全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA

006

SpringDataJPA 核心技术

全面讲解SpringDataJPA核心技术

构建项目

我们基于上一章的项目进行升级,我们先来将Chapter41项目Copy一份命名为Chapter42

构建 rabbitmq-consumer-node2

基于我们复制的Chapter42项目,创建一个Module子项目命名为rabbitmq-consumer-node2,用于消费者的第二个节点,接下来我们为rabbitmq-consumer-node2项目创建一个入口启动类RabbitmqConsumerNode2Application,代码如下所示:

代码语言:javascript
复制
/**
 * 消息队列消息消费者节点2入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:15
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqConsumerNode2Application
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqConsumerNode2Application.class);

    /**
     * rabbitmq消费者启动入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqConsumerNode2Application.class,args);

        logger.info("【【【【【消息队列-消息消费者节点2启动成功.】】】】】");
    }
}

为了区分具体的消费者节点,我们在项目启动成功后打印了相关的日志信息,下面我们来编写application.properties配置文件信息,可以直接从rabbitmq-consumer子项目内复制内容,复制后需要修改server.port以及spring.application.name,如下所示:

代码语言:javascript
复制
#端口号
server.port=1112
#项目名称
spring.application.name=rabbitmq-consumer-node2


#rabbitmq相关配置
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#服务器ip
spring.rabbitmq.host=localhost
#虚拟空间地址
spring.rabbitmq.virtual-host=/
#端口号
spring.rabbitmq.port=5672
#配置发布消息确认回调
spring.rabbitmq.publisher-confirms=true

因为我们是本地测试项目,所以需要修改对应的端口号,防止端口被占用。

创建用户注册消费者

复制rabbitmq-consumer子项目内的UserConsumer类到rabbitmq-consumer-node2子项目对应的package内,如下所示:

代码语言:javascript
复制
/**
 * 用户注册消息消费者
 * 分布式节点2
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:20
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "user.register.queue")
public class UserConsumer {

    /**
     * logback
     */
    private Logger logger = LoggerFactory.getLogger(UserConsumer.class);

    @RabbitHandler
    public void execute(Long userId)
    {
        logger.info("用户注册消费者【节点2】获取消息,用户编号:{}",userId);

        //...//自行业务逻辑处理
    }
}

为了区分具体的消费者输出内容,我们在上面UserConsumer消费者消费方法内打印了相关日志输出,下面我们同样把rabbitmq-consumer子项目内UserConsumer的消费方法写入相关日志,如下所示:

代码语言:javascript
复制
@RabbitHandler
    public void execute(Long userId)
    {
        logger.info("用户注册消费者【节点1】获取消息,用户编号:{}",userId);

        //...//自行业务逻辑处理
    }

到目前为止我们的多节点RabbitMQ消费者已经编写完成,下面我们来模拟多个用户注册的场景,来查看用户注册消息是否被转发并唯一性的分配给不同的消费者节点。

运行测试

我们打开上一章编写的UserTester测试类,为了模拟多用户注册请求,我们对应的创建一个内部线程类BatchRabbitTester,在线程类内编写注册请求代码,如下所示:

代码语言:javascript
复制
 /**
     * 批量添加用户线程测试类
     * run方法发送用户注册请求
     */
    class BatchRabbitTester implements Runnable
    {
        private int index;
        public BatchRabbitTester() { }

        public BatchRabbitTester(int index) {
            this.index = index;
        }


        @Override
        public void run() {
            try {
                mockMvc.perform(MockMvcRequestBuilders.post("/user/save")
                        .param("userName","yuqiyu" + index)
                        .param("name","恒宇少年" + index)
                        .param("age","23")
                )
                        .andDo(MockMvcResultHandlers.log())
                        .andReturn();
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }

为了区分每一个注册信息是否都已经写入到数据库,我们为BatchRabbitTester添加了一个有参的构造方法,将for循环的i值对应的传递为index的值。下面我们来编写对应的批量注册的测试方法,如下所示:

代码语言:javascript
复制
 /**
     * 测试用户批量添加
     * @throws Exception
     */
    @Test
    public void testBatchUserAdd() throws Exception
    {
        for (int i = 0 ; i < 10 ; i++) {
            //创建用户注册线程
            Thread thread = new Thread(new BatchRabbitTester(i));
            //启动线程
            thread.start();
        }
        //等待线程执行完成
        Thread.sleep(2000);
    }

我们循环10次来测试用户注册请求,每一次都会创建一个线程去完成发送注册请求逻辑,在方法底部添加了sleep方法,目的是为了阻塞测试用例的结束,因为我们测试用户完成方法后会自动停止,不会去等待其他线程执行完成,所以这里我们阻塞测试主线程来完成发送注册线程请求逻辑。

执行批量注册测试方法

我们在执行测试批量注册用户消息之前,先把rabbitmq-consumerrabbitmq-consumer-node2两个消费者子项目启动,项目启动完成后可以看到控制台输出启动成功日志,如下所示:

代码语言:javascript
复制
rabbitmq-consumer:
2017-12-10 17:10:36.961  INFO 15644 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1111 (http)
2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : Started RabbitmqConsumerApplication in 2.405 seconds (JVM running for 3.39)
2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息队列-消息消费者启动成功.】】】】】

rabbitmq-consumer-node2:
2017-12-10 17:11:31.679  INFO 13812 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1112 (http)
2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : Started RabbitmqConsumerNode2Application in 2.419 seconds (JVM running for 3.129)
2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : 【【【【【消息队列-消息消费者节点2启动成功.】】】】】

接下来我们来运行testBatchUserAdd方法,查看测试控制台输出内容如下所示:

代码语言:javascript
复制
2017-12-10 17:15:02.619  INFO 14456 --- [       Thread-3] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#528df369:0/SimpleConnection@39b6ba57 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60936]
 回调id:194b5e67-6913-474a-b2ac-6e938e1e85e8
消息发送成功
 回调id:e88ce59c-3eb9-433c-9e25-9429e7076fbe
消息发送成功
 回调id:3e5b8382-6f63-450f-a641-e3d8eee255b2
消息发送成功
 回调id:39103357-6c80-4561-acb7-79b32d6171c9
消息发送成功
 回调id:9795d227-b54e-4cde-9993-a5b880fcfe39
消息发送成功
 回调id:e9b8b828-f069-455f-a366-380bf10a5909
消息发送成功
 回调id:6b5b4a9c-5e7f-4c53-9eef-98e06f8be867
消息发送成功
 回调id:619a42f3-cb94-4434-9c75-1e28a04ce350
消息发送成功
 回调id:6b720465-b64a-4ed9-9d8c-3e4dafa4faed
消息发送成功
 回调id:b4296f7f-98cc-423b-a4ef-0fc31d22cb08
消息发送成功

可以看到确实已经成功的发送了10条用户注册消息到RabbitMQ服务端,那么是否已经正确的成功的将消息转发到消费者监听方法了呢?我们来打开rabbitmq-consumer子项目的启动控制台查看日志输出内容如下所示:

代码语言:javascript
复制
2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息队列-消息消费者启动成功.】】】】】
2017-12-10 17:15:02.695  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:20
2017-12-10 17:15:02.718  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:22
2017-12-10 17:15:02.726  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:26
2017-12-10 17:15:02.729  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:21
2017-12-10 17:15:02.789  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:28

可以看到成功的接受了5条对应用户注册消息内容,不过这里具体接受的条数并不是固定的,这也是RabbitMQ消息转发权重内部问题。

下面我们打开rabbitmq-consumer-node2子项目控制台查看日志输出内容如下所示:

代码语言:javascript
复制
2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : 【【【【【消息队列-消息消费者节点2启动成功.】】】】】
2017-12-10 17:15:02.708  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:25
2017-12-10 17:15:02.717  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:23
2017-12-10 17:15:02.719  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:24
2017-12-10 17:15:02.727  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:27
2017-12-10 17:15:02.790  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:29

同样获得了5条用户注册消息,不过并没有任何规律可言,编号也不是顺序的。

所以多节点时消息具体分发到哪个节点并不是固定的,完全是RabbitMQ分发机制来控制。

总结

本章完成了基于SpringBoot平台整合RabbitMQ单个Provider对应绑定多个Consumer来进行多节点分布式消费者消息消费,实际生产项目部署时完全可以将消费节点分开到不同的服务器,只要消费节点可以访问到RabbitMQ服务端,可以正常通讯,就可以完成消息消费。

本章源码已经上传到码云:

SpringBoot配套源码地址:https://gitee.com/hengboy/spring-boot-chapter

SpringCloud配套源码地址:https://gitee.com/hengboy/spring-cloud-chapter

SpringBoot相关系列文章请访问:目录:SpringBoot学习目录

QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录

SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录,感谢阅读!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.12.10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 本章目标
  • SpringBoot 企业级核心技术学习专题
  • 构建项目
    • 构建 rabbitmq-consumer-node2
    • 运行测试
    • 总结
    相关产品与服务
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档