前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rebbitMQ【rebbitMQ入门到精通】

rebbitMQ【rebbitMQ入门到精通】

作者头像
高大北
发布2022-09-07 14:29:40
3870
发布2022-09-07 14:29:40
举报
文章被收录于专栏:java架构计划训练营

MQ架构设计原理

什么是消息中间件

消息中间件基于队列模型实现异步/同步传输数据 作用:可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。

传统的http请求存在那些缺点

  1. Http请求基于请求与响应的模型,在高并发的情况下,客户端发送大量的请求达到 服务器端有可能会导致我们服务器端处理请求堆积。
  2. Tomcat服务器处理每个请求都有自己独立的线程,如果超过最大线程数会将该请求缓存到队列中,如果请求堆积过多的情况下,有可能会导致tomcat服务器崩溃的问题。 所以一般都会在nginx入口实现限流,整合服务保护框架。
image-1657089454394
image-1657089454394
  1. http请求处理业务逻辑如果比较耗时的情况下,容易造成客户端一直等待,阻塞等待 过程中会导致客户端超时发生重试策略,有可能会引发幂等性问题。

注意事项:接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理

Mq应用场景有那些

  1. 异步发送短信
  2. 异步发送新人优惠券
  3. 处理一些比较耗时的操作

为什么需要使用mq

可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。

同步发送http请求

image-1657089582553
image-1657089582553

客户端发送请求到达服务器端,服务器端实现会员注册业务逻辑, 1.insertMember() --插入会员数据 1s 2.sendSms()----发送登陆短信提醒 3s 3.sendCoupons()----发送新人优惠券 3s 总共响应需要6s时间,可能会导致客户端阻塞6s时间,对用户体验 不是很好。

多线程与MQ方式实现异步?

互联网项目: 客户端 安卓/IOS

服务器端:php/java 最好使用mq实现异步

多线程处理业务逻辑

image-1657089631274
image-1657089631274

用户向数据库中插入一条数据之后,在单独开启一个线程异步发送短信和优惠操作。 客户端只需要等待1s时间 优点:适合于小项目 实现异步 缺点:有可能会消耗服务器cpu资源资源

Mq处理业务逻辑

image-1657089682753
image-1657089682753

先向数据库中插入一条会员数据,让后再向MQ中投递一个消息,MQ服务器端在将消息推送给消费者异步解耦处理发送短信和优惠券。

Mq与多线程之间区别

MQ可以实现异步/解耦/流量削峰问题; 多线程也可以实现异步,但是消耗到cpu资源,没有实现解耦。

Mq消息中间件名词

Producer 生产者:投递消息到MQ服务器端; Consumer 消费者:从MQ服务器端获取消息处理业务逻辑; Broker MQ服务器端 Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题 Queue 存放消息模型 队列 先进先出 后进后出原则 数组/链表 Message 生产者投递消息报文:json

主流mq区别对比

image-1657089904232
image-1657089904232

Mq设计基础知识

多线程版本mq; 基于网络通讯版本mq netty实现

基于多线程队列简单实现mq(单机)

代码语言:javascript
复制
public class GtfThreadMQ {

    private static LinkedBlockingQueue<String> queue=new LinkedBlockingQueue<>();

    public static void main(String[] args) {
        //生产者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    queue.put("userid");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"生产者").start();
        //消费者线程
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("1");
                while (true){
                    String poll = queue.poll();
                    if (poll != null) {
                        System.out.println(poll);
                    }
                }
            }
        },"消费者").start();

    }
}

基于netty实现mq

消费者netty客户端与nettyServer端MQ服务器端保持长连接,MQ服务器端保存 消费者连接。 生产者netty客户端发送请求给nettyServer端MQ服务器端,MQ服务器端在将该 消息内容发送给消费者。

image-1657092513308
image-1657092513308

RabbitMQ

RabbitMQ基本介绍

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ服务器是用Erlang语言编写的。 RabitMQ官方网站: https://www.rabbitmq.com/

image-1657092774045
image-1657092774045
image-1657092780332
image-1657092780332
  1. 点对点(简单)的队列
  2. 工作(公平性)队列模式
  3. 发布订阅模式
  4. 路由模式Routing
  5. 通配符模式Topics
  6. RPC https://www.rabbitmq.com/getstarted.html

RabbitMQ环境的基本安装(忽略请自行搜索本博客)

快速入门RabbitMQ简单队列

image-1657096211151
image-1657096211151
image-1657096304268
image-1657096304268

依赖

代码语言:javascript
复制
<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>3.6.5 </version>
    </dependency>
</dependencies>

获取链接

代码语言:javascript
复制
public class GetCon  {

    /**
     * 获取连接
     * @return
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.配置Host
        connectionFactory.setHost("127.0.0.1");
        //3.设置Port
        connectionFactory.setPort(5672);
        //4.设置账户和密码
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        //5.设置VirtualHost
        connectionFactory.setVirtualHost("gtf_virtualhost");
        return connectionFactory.newConnection();
    }
}

生产者

代码语言:javascript
复制
/**
 * 生产者
 */
public class Product {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = GetCon.getConnection();
        Channel channel = connection.createChannel();
        String msg="1";
//        channel.basicPublish(null,"gtf_queues",null,msg.getBytes());
        channel.basicPublish("","gtf_queues",null,msg.getBytes("UTF-8"));
        channel.close();
        connection.close();
    }
}

消费者

代码语言:javascript
复制
public class Consumer {
    private static final String QUEUE_NAME = "gtf_queues";

    public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
        // 1.创建连接
        Connection connection = GetCon.getConnection();
        // 2.设置通道
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msg);
                // 消费者完成 消费该消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }
}

RabbitMQ如何保证消息不丢失

Mq如何保证消息不丢失:

  1. 生产者角色 确保生产者投递消息到MQ服务器端成功。 Ack 消息确认机制 同步或者异步的形式 方式1:Confirms 方式2:事务消息
  2. 消费者角色 在rabbitmq情况下: 必须要将消息消费成功之后,才会将该消息从mq服务器端中移除。 在kafka中的情况下: 不管是消费成功还是消费失败,该消息都不会立即从mq服务器端移除。
  3. Mq服务器端 在默认的情况下 都会对队列中的消息实现持久化 持久化硬盘。

RabbitMQ五种消息模式

RabitMQ工作队列

默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。

image-1657101286175
image-1657101286175

采用工作队列 在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。 channel.basicQos(1);

RabbitMQ交换机类型

Direct exchange(直连交换机) Fanout exchange(扇型交换机) Topic exchange(主题交换机) Headers exchange(头交换机) /Virtual Hosts—区分不同的团队 ----队列 存放消息 ----交换机 路由消息存放在那个队列中 类似于nginx —路由key 分发规则

RabbitMQ Fanout 发布订阅

生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者。

image-1657101588146
image-1657101588146
image-1657101593473
image-1657101593473

原理:

  1. 需要创建两个队列 ,每个队列对应一个消费者;
  2. 队列需要绑定我们交换机
  3. 生产者投递消息到交换机中,交换机在将消息分配给两个队列中都存放起来;
  4. 消费者从队列中获取这个消息。

SpringBoot整合RabbitMQ

maven

代码语言:javascript
复制
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.RELEASE</version>
</parent>
<dependencies>

    <!-- springboot-web组件 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- 添加springboot对amqp的支持 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>
    <!--fastjson -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.49</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

配置类

代码语言:javascript
复制
@Component
public class RabbitMQConfig {
    /**
     * 定义交换机
     */
    private String EXCHANGE_SPRINGBOOT_NAME = "/gtf_ex";


    /**
     * 短信队列
     */
    private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
    /**
     * 邮件队列
     */
    private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";

    /**
     * 配置smsQueue
     *
     * @return
     */
    @Bean
    public Queue smsQueue() {
        return new Queue(FANOUT_SMS_QUEUE);
    }

    /**
     * 配置emailQueue
     *
     * @return
     */
    @Bean
    public Queue emailQueue() {
        return new Queue(FANOUT_EMAIL_QUEUE);
    }

    /**
     * 配置fanoutExchange
     *
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);
    }

    // 绑定交换机 sms
    @Bean
    public Binding bindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(smsQueue).to(fanoutExchange);
    }

    // 绑定交换机 email
    @Bean
    public Binding bindingEmailFanoutExchange(Queue emailQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(emailQueue).to(fanoutExchange);
    }
}

application.yml

代码语言:javascript
复制
spring:
  rabbitmq:
    ####连接地址
    host: 127.0.0.1
    ####端口号
    port: 5672
    ####账号
    username: admin
    ####密码
    password: 123456
    ### 地址
    virtual-host: /gtf_virtualhost
  redis:
    host: 127.0.0.1
#    password: 123456
    port: 6379

RabbitMQ实战解决方案

RabbitMQ死信队列

死信队列产生的背景 RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。

产生死信队列的原因

  1. 消息投递到MQ中存放 消息已经过期 消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。
  2. 队列达到最大的长度 (队列容器已经满了)
  3. 消费者消费多次消息失败,就会转移存放到死信队列中
image-1657265774340
image-1657265774340

死信队列的架构原理

死信队列和普通队列区别不是很大 普通与死信队列都有自己独立的交换机和路由key、队列和消费者。 区别: 1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到 普通队列中缓存起来,普通队列对应有自己独立普通消费者。 2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费 的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机 对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。

死信队列应用场景

1.30分钟订单超时设计 A. Redis过期key : B. 死信延迟队列实现: 采用死信队列,创建一个普通队列没有对应的消费者消费消息,在30分钟过后 就会将该消息转移到死信备胎消费者实现消费。 备胎死信消费者会根据该订单号码查询是否已经支付过,如果没有支付的情况下 则会开始回滚库存操作。

RabbitMQ消息幂等问题

RabbitMQ消息自动重试机制
  1. 当我们消费者处理执行我们业务代码的时候,如果抛出异常的情况下 在这时候mq会自动触发重试机制,默认的情况下rabbitmq是无限次数的重试。 需要人为指定重试次数限制问题
  2. 在什么情况下消费者需要实现重试策略?

A.消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试? 该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。 B. 消费者获取消息后,因为代码问题抛出数据异常,是否需要重试? 该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。 可以将日志存放起来,后期通过定时任务或者人工补偿形式。 如果是重试多次还是失败消息,需要重新发布消费者版本实现消费 可以使用死信队列

如何合理选择消息重试
  1. 消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试 ?
  2. 消费者获取消息后,应该代码问题抛出数据异常,是否需要重试?

总结:如果消费者处理消息时,因为代码原因抛出异常是需要从新发布版本才能解决的,那么就不需要重试,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期人工实现补偿。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-07-06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MQ架构设计原理
    • 什么是消息中间件
      • 传统的http请求存在那些缺点
        • Mq应用场景有那些
          • 为什么需要使用mq
            • 同步发送http请求
            • 多线程处理业务逻辑
            • Mq处理业务逻辑
          • Mq与多线程之间区别
            • Mq消息中间件名词
              • 主流mq区别对比
                • Mq设计基础知识
                  • 基于多线程队列简单实现mq(单机)
                  • 基于netty实现mq
              • RabbitMQ
                • RabbitMQ基本介绍
                  • RabbitMQ环境的基本安装(忽略请自行搜索本博客)
                    • 快速入门RabbitMQ简单队列
                      • 依赖
                      • 获取链接
                      • 生产者
                      • 消费者
                    • Mq如何保证消息不丢失:
                      • RabbitMQ五种消息模式
                        • RabitMQ工作队列
                        • RabbitMQ交换机类型
                      • SpringBoot整合RabbitMQ
                        • maven
                        • 配置类
                        • application.yml
                      • RabbitMQ实战解决方案
                        • RabbitMQ死信队列
                        • 产生死信队列的原因
                        • 死信队列的架构原理
                        • 死信队列应用场景
                        • RabbitMQ消息幂等问题
                    相关产品与服务
                    短信
                    腾讯云短信(Short Message Service,SMS)可为广大企业级用户提供稳定可靠,安全合规的短信触达服务。用户可快速接入,调用 API / SDK 或者通过控制台即可发送,支持发送验证码、通知类短信和营销短信。国内验证短信秒级触达,99%到达率;国际/港澳台短信覆盖全球200+国家/地区,全球多服务站点,稳定可靠。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档