前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMq 笔记,一篇文章入门

RabbitMq 笔记,一篇文章入门

作者头像
一写代码就开心
发布2022-05-09 10:40:14
6550
发布2022-05-09 10:40:14
举报
文章被收录于专栏:java和python

目录

默认的端口15672:rabbitmq管理平台端口号 默认的端口5672: rabbitmq消息中间内部通讯的端口 默认的端口号25672 rabbitmq集群的端口号

在这里插入图片描述
在这里插入图片描述

传统的http请求存在那些缺点

我们学习mq之前,我们可以看看传统的http请求有什么缺点?

在这里插入图片描述
在这里插入图片描述

1 浏览器发送http请求,在高并发情况下,会对服务器造成压力; 2 有的服务器会设置最大的请求线程数,如果高并发,剩余的会放到队列里面,队列里面的线程多了,也会造成服务器崩溃; 3 如果这个请求的逻辑里面,处理的业务是比较的大,比较的耗时,这样客户端就会一直的等待,或者超时之后,客户端会一直尝试的重新请求,这样都是问题;

注意事项:接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理。

为什么需要使用mq

可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。

java代码使用多线程的缺点

多线程会造成上下文的切换,抢cpu,会对主业务造成问题,有可能延迟主业务的执行时间;

优点:适合于小项目 实现异步 缺点:有可能会消耗服务器cpu资源资源

rabbitmq安装

我们是在docker里面安装的rabbitmq,所以很快

在这里插入图片描述
在这里插入图片描述

Virtual Hosts

在RabbitMQ中可以虚拟消息服务器VirtualHost,每

个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互

隔离的。exchange、queue、message不能互通。

代码语言:javascript
复制
比如我们一个团队,做支付,做会员,都需要用到这个rabbitmq,我们就可以
使用这个VirtualHost  进行区分
在这里插入图片描述
在这里插入图片描述

入门案例(一个消费者)

1 创建一个maven项目 2 pom文件

代码语言:javascript
复制
  <!--指定 jdk 编译版本--> <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

    <dependencies>
            <!--rabbitmq 依赖客户端-->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.8.0</version>
            </dependency>
            <!--操作文件流的一个依赖-->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.6</version>
            </dependency>
    </dependencies>

3 创建发送者的消息controller

代码语言:javascript
复制
public class Producer {
//    创建一个队列
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.40.129");
        factory.setUsername("root");
        factory.setPassword("123456");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        try(
             Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()) {
            /**
             * 生成一个队列(queueDeclare  里面的参数)
             * 1.队列名称
             * 2.队列里面的消息是否持久化 默认消息存储在内存中
             * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
             * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             * 5.其他参数
             */
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            String message="hello world";
            /**
             * 发送一个消息(basicPublish 参数)
             * 1.发送到那个交换机
             * 2.路由的 key 是哪个
             * 3.其他的参数信息
             * 4.发送消息的消息体
             */
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送完毕");
        }
    }
}

执行完以上的

在这里插入图片描述
在这里插入图片描述

4 消费者的代码

代码语言:javascript
复制
public class Consume {

    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.40.129");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("等待接收消息....");
        //推送的消息如何进行消费的接口回调
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String message= new String(delivery.getBody());
            System.out.println(message);
        };
        //取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消息消费被中断");
        };

        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3.消费者未成功消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

执行这个代码,会一直运行,就等的队列,只要队列里面有消息,那么就立马输出;

多个消费者的案例

问题

在这里插入图片描述
在这里插入图片描述

实现

1 创建一个链接rabbitmq的工具类

代码语言:javascript
复制
public class RabbitMqUtils {

    //得到一个连接的 channel
    public static Channel getChannel() throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.40.129");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

2 创建消费者

代码语言:javascript
复制
public class Consume02 {

    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");

        };
        System.out.println("C2 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

3 使用idea工具,将同一个文件启动多次

在这里插入图片描述
在这里插入图片描述

勾选了那个就是多线程了;

在这里插入图片描述
在这里插入图片描述

4 写生产者的代码

代码语言:javascript
复制
public class Producer01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        try(Channel channel= RabbitMqUtils.getChannel();) {
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //从控制台当中接受信息
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("发送消息完成:"+message);
            }
        }
    }
}
在这里插入图片描述
在这里插入图片描述

消息应答

为什么要有这个

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了

代码语言:javascript
复制
就是为了防止消息丢失

自动应答

就是消费者一拿到队列里面的消息,就告诉队列,成功消费了,这种是不可取的,因为万一拿到之后,处理过程中报错了咋办;

代码语言:javascript
复制
想要性能,就用自动应答,因为速度快,但是安全性较低,有可能丢失消息

手动应答

里面有3个方法

代码语言:javascript
复制
A.Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)

C.Channel.basicReject(用于否定确认) 
与 Channel.basicNack 相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了

手动应答的好处是可以批量应答并且减少网络拥堵

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

代码里面使用false,建议; 只应答当前处理完成的;

消息自动重新入队

代码语言:javascript
复制
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),
导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,
并将对其重新排队。如果此时其他消费者可以处理,
它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,
也可以确保不会丢失任何消息。

就是消费者没有返回ack,那么就将消息重新入队;

RabbitMQ 持久化

为什么持久化

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消 息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化。

队列如何实现持久化

之前我们创建的队列都是非持久化的,rabbitmq 如果重启的化,该队列就会被删除掉,如果 要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化

在这里插入图片描述
在这里插入图片描述

不要轮训分发(不公平分发)

能者多劳,建议使用不公平分发;

我们可以设置参数 channel.basicQos(1);设置在消费方

在这里插入图片描述
在这里插入图片描述

预取值

在这里插入图片描述
在这里插入图片描述

。该值定义通道上允许的未确认消息的最大数量。

在这里插入图片描述
在这里插入图片描述

发布确认

我们之前为了消息不丢失,要求了队列持久化,消息持久化,但是在消息持久化到磁盘之前,rabbitmq宕机了,咋办,消息还是会丢失的,所以我们需要第三个,就是在消息确保到硬盘的时候,返回给发送者一个确认机制,就是发布确认。

在这里插入图片描述
在这里插入图片描述

发布确认的策略

1 开启发布确认的方法 发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法(发送端)

在这里插入图片描述
在这里插入图片描述

单个确认发布(在生产端)

一个确认了,后面的才发

代码语言:javascript
复制
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个
消息之后只有它被确认发布,后续的消息才能继续发布,
waitForConfirmsOrDie(long)这个方法只有在消息被确认
的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。


这种确认方式有一个最大的缺点就是:发布速度特别的慢,

因为如果没有确认
发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条
发布消息的吞吐量。当然对于某
些应用程序来说这可能已经足够了。

批量确认发布(在生产端)

最主要是这两个代码

代码语言:javascript
复制
//开启发布确认
 channel.confirmSelect();
 
 //为了确保还有剩余没有确认消息 再次确认
 if (outstandingMessageCount > 0) {
 channel.waitForConfirms();
 }
代码语言:javascript
复制
public static void publishMessageBatch() throws Exception {
 try (Channel channel = RabbitMqUtils.getChannel()) {
 String queueName = UUID.randomUUID().toString();
 channel.queueDeclare(queueName, false, false, false, null);
 
//开启发布确认
 channel.confirmSelect();


 //批量确认消息大小
 int batchSize = 100;
 //未确认消息个数
 int outstandingMessageCount = 0;
 long begin = System.currentTimeMillis();
 for (int i = 0; i < MESSAGE_COUNT; i++) {
 String message = i + "";
 channel.basicPublish("", queueName, null, message.getBytes());
 outstandingMessageCount++;
 if (outstandingMessageCount == batchSize) {
 channel.waitForConfirms();
 outstandingMessageCount = 0;
 }
 }
 //为了确保还有剩余没有确认消息 再次确认
 if (outstandingMessageCount > 0) {
 channel.waitForConfirms();
 }
 long end = System.currentTimeMillis();
 System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) + 
"ms");
 } }

异步确认发布(在生产端)

在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
就是发送端到rabbitmq之间有一个信道,在这个信道里面是一个队列,可以理解
为数组,每一个数组里面存到是map,键值对形式保存;

这个信道里面的消息到了rabbitmq里面的队列里面,不管是成功到达,还是失败
,都会异步返回给发送者,发送者不用管,因为是异步的,所以发送者只需要
一直发消息就可以了;

发送者根据key可以知道,那个成功,哪个失败;

什么是交换机

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

直接(direct)

消息只去到它绑定的routingKey 队列中去

主题(topic) 路由

在这里插入图片描述
在这里插入图片描述

标题(headers)

扇出(fanout)发布订阅

它是将接收到的所有消息广播到它知道的所有队列中。 和路由键是没有关系的,不管队列里面的路由键一样还是不一样,都会发到所有的队列;

临时队列

一旦我们断开了消费者的连接,队列将被自动删除。也就是不是持久化的队列,就是临时队列;

在这里插入图片描述
在这里插入图片描述

绑定(bindings)

什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系

死信队列

无法被消费的消息,放到死信队列,之后再处理

应用场景为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效

在这里插入图片描述
在这里插入图片描述

延迟队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。

高级发布确认

之前是消息到达队列了,在准备持久化之前,宕机了,要进行确认,现在是准备发消息呀,发现rabbitmq宕机了,宕机的时间是不一样的

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-03-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目录
  • 传统的http请求存在那些缺点
  • 为什么需要使用mq
  • java代码使用多线程的缺点
  • rabbitmq安装
  • Virtual Hosts
  • 入门案例(一个消费者)
  • 多个消费者的案例
    • 问题
      • 实现
      • 消息应答
        • 为什么要有这个
          • 自动应答
            • 手动应答
              • 消息自动重新入队
              • RabbitMQ 持久化
                • 为什么持久化
                  • 队列如何实现持久化
                  • 不要轮训分发(不公平分发)
                  • 预取值
                  • 发布确认
                    • 发布确认的策略
                      • 单个确认发布(在生产端)
                      • 批量确认发布(在生产端)
                      • 异步确认发布(在生产端)
                  • 什么是交换机
                    • 直接(direct)
                      • 主题(topic) 路由
                        • 标题(headers)
                          • 扇出(fanout)发布订阅
                          • 临时队列
                          • 绑定(bindings)
                          • 死信队列
                          • 延迟队列
                          • 高级发布确认
                          相关产品与服务
                          容器服务
                          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档