前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >spring-boot-route(十三)整合RabbitMQ消息队列

spring-boot-route(十三)整合RabbitMQ消息队列

作者头像
Java旅途
发布2020-10-21 11:09:06
7390
发布2020-10-21 11:09:06
举报
文章被收录于专栏:Java旅途Java旅途

这篇是SpringBoot整合消息队列的第一篇文章,我们详细介绍下消息队列的相关内容。

消息队列简介

1. 什么是消息队列

MQ(Message Quene):通过典型的生产者和消费者模型,生产者不断向消息队列中产生消息,消费者不断的从队列中获取消息。因为生产者和消费者都是异步的,而且生产者只关心消息的发送,消费者只关心消息的接收,没有业务逻辑的侵入,轻松实现业务解耦。

2. 消息队列有什么用

  • 异步处理

场景描述:某商场具有注册功能,注册的时候需要发送短信验证码。

传统的做法是用户提交信息到用户服务,用户服务调用短信服务发送短信,然后给用户返回响应,这种是同步的处理方式,耗时较长。加入消息队列后,用户直接提交信息到用户服务,将信息写入消息队列,直接给用户返回响应,短信服务从消息队列中读取消息进行发送短信。

  • 应用解耦

场景描述:某商场下单流程。

传统做法是用户下单,订单系统去查询库存系统,如果库存系统宕机了,则下单失败,损失订单量。加入消息队列后,用户下单,订单系统记录订单,将订单信息写入消息队列,下单成功,然后库存系统恢复正常后去操作数据库库存(不考虑库存为0的情况)。这样订单系统和库存系统就达到松耦合的目的了

  • 流量削峰

场景描述:秒杀活动。

流量过大肯定会导致响应超时或系统宕机,加入消息队列,用户秒杀请求写入消息队列,设置消息队列的长度等属性,达到消息队列最大长度后,直接返回秒杀失败,然后再去消费消息队列的数据,完成秒杀。

RabbitMQ简介

RabbitMQ是用Erlang语言编写的,实现了高级消息队列协议(AMQP)的消息中间件。

1. AMQP协议概念

AMQPAMQP是一种链接协议,直接定义网络交换的数据格式,这使得实现了AMQPprovider本身就是跨平台的。以下是AMQP协议模型:

  • server - 又称broker,接收客户端的链接,实现amqp实体服务。
  • Connection - 链接,应用程序跟broker的网络链接。
  • channel - 网络信道,几乎所有的操作都是在channel中进行,数据的流转都要在channel上进行。channel是进行消息读写的通道。客户端可以建立多个channel,每个channel代表一个会话任务。
  • message - 消息,服务器与应用程序之间传送的数据。由properties和body组成。properties可以对消息进行修饰,比如消息的升级,延迟等高级特性。body就是消息体的内容。
  • virtual host - 虚拟主机,用于进行逻辑隔离,最上层的消息路由,一个虚拟地址里面可以有多个交换机。exchange和消息队列message quene。
  • exchange - 交换机,接收消息,根据路由器转发消息到绑定的队列。
  • binding - 绑定,交换机和队列之间的虚拟链接,绑定中可以包含routing key。
  • routing key - 一个路由规则,虚拟机可以用它来确定jiekyi如何路由一个特定消息。
  • quene - 消息队列,保存消息并将它们转发给消费者。

2. RabbitMQ的消息模型

1. 简单模型

在上图中:

  • p:生成者
  • C:消费者
  • 红色部分:quene,消息队列
2. 工作模型

在上图中:

  • p:生成者
  • C1、C2:消费者
  • 红色部分:quene,消息队列

当消息处理比较耗时时,就会出现生产消息的速度远远大于消费消息的速度,这样就会出现消息堆积,无法及时处理。这时就可以让多个消费者绑定一个队列,去消费消息,队列中的消息一旦消费就会丢失,因此任务不会重复执行。

3. 广播模型(fanout)

这种模型中生产者发送的消息所有消费者都可以消费。

在上图中:

  • p:生成者
  • X:交换机
  • C1、C2:消费者
  • 红色部分:quene,消息队列
4. 路由模型(routing)

这种模型消费者发送的消息,不同类型的消息可以由不同的消费者去消费。

在上图中:

  • p:生成者
  • X:交换机,接收到生产者的消息后将消息投递给与routing key完全匹配的队列
  • C1、C2:消费者
  • 红色部分:quene,消息队列
5. 订阅模型(topic)

这种模型和direct模型一样,都是可以根据routing key将消息路由到不同的队列,只不过这种模型可以让队列绑定routing key 的时候使用通配符。这种类型的routing key都是由一个或多个单词组成,多个单词之间用.分割。

通配符介绍:

*:只匹配一个单词

#:匹配一个或多个单词

6. RPC模型

这种模式需要通知远程计算机运行功能并等待返回运行结果。这个过程是阻塞的。

当客户端启动时,它创建一个匿名独占回调队列。并提供名字为call的函数,这个call会发送RPC请求并且阻塞直到收到RPC运算的结果。

Spring Boot整合RabbitMQ

第一步:引入pom依赖

代码语言:javascript
复制
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:增加RabbitMQ服务配置信息

代码语言:javascript
复制
spring:
  rabbitmq:
    virtual-host: javatrip
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest

这里我们用广播模型来举例使用,广播模型(fanout)比较好理解,就像公众号一样,我每天推文章后,会推送给每个关注用户,他们都可以看到这条消息。

广播模型注意点:

  1. 可以有多个队列
  2. 每个队列都需要绑定交换机
  3. 每个消费者有自己的队列
  4. 交换机把消息发送给绑定过的所有队列

1. 定义两个队列

代码语言:javascript
复制
@Configuration
public class RabbitConfig {

    final static String queueNameA = "first-queue";
    final static String queueNameB = "second-queue";

    /***
     * 定义一个队列,设置队列属性
     * @return
     */
    @Bean("queueA")
    public Queue queueA(){

        Map<String,Object> map = new HashMap<>();
        // 消息过期时长,10秒过期
        map.put("x-message-ttl",10000);
        // 队列中最大消息条数,10条
        map.put("x-max-length",10);
        // 第一个参数,队列名称
        // 第二个参数,durable:持久化
        // 第三个参数,exclusive:排外的,
        // 第四个参数,autoDelete:自动删除
        Queue queue = new Queue(queueNameA,true,false,false,map);
        return queue;
    }
    
    @Bean("queueB")
    public Queue queueB(){

        Map<String,Object> map = new HashMap<>();
        // 消息过期时长,10秒过期
        map.put("x-message-ttl",10000);
        // 队列中最大消息条数,10条
        map.put("x-max-length",10);
        // 第一个参数,队列名称
        // 第二个参数,durable:持久化
        // 第三个参数,exclusive:排外的,
        // 第四个参数,autoDelete:自动删除
        Queue queue = new Queue(queueNameB,true,false,false,map);
        return queue;
    }
}

2. 定义扇形交换机

代码语言:javascript
复制
@Bean
public FanoutExchange fanoutExchange(){

    // 第一个参数,交换机名称
    // 第二个参数,durable,是否持久化
    // 第三个参数,autoDelete,是否自动删除
    FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true,false);
    return fanoutExchange;
}

3. 交换机和队列绑定

代码语言:javascript
复制
@Bean
public Binding bindingA(@Qualifier("queueA") Queue queueA, FanoutExchange fanoutExchange){
    Binding binding = BindingBuilder.bind(queueA).to(fanoutExchange);
    return binding;
}

@Bean
public Binding bindingB(@Qualifier("queueB") Queue queueB,FanoutExchange fanoutExchange){
    Binding binding = BindingBuilder.bind(queueB).to(fanoutExchange);
    return binding;
}

4. 创建两个消费者分别监听两个队列

代码语言:javascript
复制
@RabbitListener(queues = RabbitConfig.queueNameA)
@Component
@Slf4j
public class ConsumerA {

    @RabbitHandler
    public void receive(String message){
        log.info("消费者A接收到的消息:"+message);
    }
}
代码语言:javascript
复制
@RabbitListener(queues = RabbitConfig.queueNameB)
@Component
@Slf4j
public class ConsumerB {

    @RabbitHandler
    public void receive(String message){
        log.info("消费者B接收到的消息:"+message);
    }
}

5. 创建生产者生产消息

代码语言:javascript
复制
@RestController
public class provider {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("send")
    public void sendMessage(){

        String message = "你好,我是Java旅途";
        rabbitTemplate.convertAndSend(RabbitConfig.exchangeName,null,message);
    }
}

这样生产者发送一条消息后,两个消费者就能同时消费到消息了。

< END >

此是spring-boot-route系列的第十三篇文章,这个系列的文章都比较简单,主要目的就是为了帮助初次接触Spring Boot 的同学有一个系统的认识。本文已收录至我的github,欢迎各位小伙伴star!点击文末的阅读原文即可到达github仓库!

github:https://github.com/binzh303/spring-boot-route

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-10-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java旅途 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消息队列简介
    • 1. 什么是消息队列
      • 2. 消息队列有什么用
      • RabbitMQ简介
        • 1. AMQP协议概念
          • 2. RabbitMQ的消息模型
            • 1. 简单模型
            • 2. 工作模型
            • 3. 广播模型(fanout)
            • 4. 路由模型(routing)
            • 5. 订阅模型(topic)
            • 6. RPC模型
        • Spring Boot整合RabbitMQ
          • 1. 定义两个队列
            • 2. 定义扇形交换机
              • 3. 交换机和队列绑定
                • 4. 创建两个消费者分别监听两个队列
                  • 5. 创建生产者生产消息
                  相关产品与服务
                  消息队列 CMQ 版
                  消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档