前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ消息应答

RabbitMQ消息应答

作者头像
别团等shy哥发育
发布2023-02-25 14:34:07
5660
发布2023-02-25 14:34:07
举报
文章被收录于专栏:全栈开发那些事

RabbitMQ消息应答

1、概念

  消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并且只完成了部分它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该对象的消息,因为它无法接收到。   为了保证消息在发送过程中不丢失,RabbitMQ引入了消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ可以把该消息删除了

2、自动应答

  消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效以某种速率能够处理这小消息的情况下使用

3、消息应答的方法

代码语言:javascript
复制
Channel.basicAck(用于肯定确认)
RabbitMQ已经知道该消息并且成功的处理消息,可以将其丢弃了。
代码语言:javascript
复制
Channel.basicNack(用于否定确认)
代码语言:javascript
复制
Channel.basicReject(用于否定确认)
 与Channel.basicNack相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。

4、Multiple的解释

手动应答的好处是可以批量应答并且减少网络拥堵

代码语言:javascript
复制
channel.basicAck(deliveryTag,true)

multipletruefalse代表不同意思 true代表批量应答channel上未应答的消息,比如说channel上有传送tag的消息 5,6,7,8 当前tag是8 那么此时5-8的这些还未应答的消息都会被确认收到消息应答。 false同上面相比,只会应答tag=8的消息,5,6,7这三个消息依然不会被确认收到消息应答

在这里插入图片描述
在这里插入图片描述

5、消息自动重新入队

  如果消费者由于某些原因失去连接(其通道已经关闭,连接已关闭或TCP连接丢失),导致消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

在这里插入图片描述
在这里插入图片描述

如上图所示,若消费者C1的连接突然断了,那么它就没有发送ACK确认,那么RabbitMQ会将该消息重新入队,如果此时消费者C2可以处理,那就将该消息交给C2

6、消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。

6.1 启动RabbitMQ

在虚拟机中执行命令systemctl start rabbitmq-server 启动之后查看下状态systemctl status rabbitmq-server

在这里插入图片描述
在这里插入图片描述

6.2 消息生产者

代码语言:javascript
复制
/**
 * 消息在手动应答时是不丢失的、放回队列中重新消费
 */
public class Task2 {

    //队列名称
    public static final String TASK_QUEUE_NAME="ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        //声明队列
        boolean durable=true;   //需要让QUEUE进行持久化
        channel.queueDeclare(TASK_QUEUE_NAME,durable,false,false,null);
//        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
        //从控制台中输入信息
        Scanner scanner=new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+message);
        }
    }
}

6.3 消费者01

代码语言:javascript
复制
package com.atguigu.rabbitmq.three;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.atguigu.rabbitmq.utils.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * 消息在手动应答时是不丢失的、放回队列中重新消费
 */
public class Work03 {

    public static final String TASK_QUEUE_NAME="ack_queue";

    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息处理时间较短");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
            //沉睡1秒
            SleepUtils.sleep(1);

            System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 参数1:消息的标记 tag
             * 参数2:是否批量应答 false:不批量应答信道中的消息 true:批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        //采用手动应答
        boolean autoAck=false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag -> {
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        }));
    }
}

6.4 消费者02

代码语言:javascript
复制
package com.atguigu.rabbitmq.three;

import com.atguigu.rabbitmq.utils.RabbitMqUtils;
import com.atguigu.rabbitmq.utils.SleepUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * 消息在手动应答时是不丢失的、放回队列中重新消费
 */
public class Work04 {

    public static final String TASK_QUEUE_NAME="ack_queue";

    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息处理时间较长");

        DeliverCallback deliverCallback=(consumerTag, message) -> {
            //沉睡1秒
            SleepUtils.sleep(30);

            System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));
            //手动应答
            /**
             * 参数1:消息的标记 tag
             * 参数2:是否批量应答 false:不批量应答信道中的消息 true:批量应答
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };
        //采用手动应答
        boolean autoAck=false;
        channel.basicConsume(TASK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag -> {
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        }));
    }
}

6.5 手动应答效果演示

我们在生产者中发送四条消息,观察消费者C1和C2的处理过程,其中 我们设置消费者C1每处理一条消息休眠1秒 消费者C2每处理一条消息休眠30秒 生产者先发送四条消息

在这里插入图片描述
在这里插入图片描述

观察消费者C1和C2(默认使用的是轮询分发) 消费者C1处理了两条

在这里插入图片描述
在这里插入图片描述

消费者C2,由于消费者C2每30秒才能接收一条消息,所以这里还看不到它处理完成的响应。

在这里插入图片描述
在这里插入图片描述

这个时候观察队列中的消息,从下图可看出还有两条消息没有被接收,这是由于C2接收消息的速度太慢(我们故意设置的慢)

在这里插入图片描述
在这里插入图片描述

这里我们假设消费者C2的连接中断了,也就是我们关闭消费者C2的线程(模仿消费者C2连接失败)。 我们查看下消费者C1,可以看到,消费者C2没有处理完成的消息被重新入队,最后由RabbitMQ交给了消费者C1处理。

在这里插入图片描述
在这里插入图片描述
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-04-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RabbitMQ消息应答
  • 1、概念
  • 2、自动应答
  • 3、消息应答的方法
  • 4、Multiple的解释
  • 5、消息自动重新入队
  • 6、消息手动应答代码
    • 6.1 启动RabbitMQ
      • 6.2 消息生产者
        • 6.3 消费者01
          • 6.4 消费者02
            • 6.5 手动应答效果演示
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档