Loading [MathJax]/jax/input/TeX/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >RabbitMQ 学习(七)----发布确认

RabbitMQ 学习(七)----发布确认

作者头像
RAIN7
发布于 2022-10-07 05:54:15
发布于 2022-10-07 05:54:15
50500
代码可运行
举报
运行总次数:0
代码可运行

文章目录

RabbitMQ 学习(七)----发布确认

发布确认是一个保证RabbitMQ 可靠性的一个机制

  保证生产者将信息成功的发送到 RabbitMQ的 server端了,那么broker就会回一个确认,如果没有收到或者收到拒绝信息,那么说明可能网络不好没有发送成功,server端宕机了,broker拒绝接收等情况,如果不进行后续处理,那么信息就会丢失,生产者收到失败的消息使用回调函数在进行处理。

  生产者将信道设置成 confirm 模式,所有在该信道上发布的消息都会指定一个唯一的ID,一旦消息投递到队列中,就是发送成功了,broker会立刻发送一个确认ack 给生产者,这个时候,生产者就知道消息已经发送成功了。

  如果队列和信息是持久化的,那么确认消息会在将消息写入磁盘之后再发出,broker返回的确认包含 确认消息的序列号,还可以设置 multiple,表示此序号前的所有消息都得到了处理。

  一旦发布消息,生产者等待确认的同时继续发送下一条消息,如果rabbitMq自身内部错误导致消息为发送成功,生产者就可以再回调方法中继续处理。

为了保证RabbitMQ的可靠性,生产者怎么做?

1、设置要求队列持久化

2、设置队列中的消息持久化

3、发布确认,保证写入磁盘,broker成功收到

(1)开启发布确认的方法

  channel 的cofirm模式默认是没有开启的,如果需要开启需要调用 confirmSelect(),当我们使用发布确认的时候。需要使用channel调用该方法。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Channel channel =connection.createChannel();
channel.confirmSelect();

(2)单个发布确认

  这是一种同步确认发布的方式,就是发布一个消息之后等待确认后,后续的消息才能继续发布。waitForConfiemOrDie(long) 这个方法只有当消息被确认才会返回,如果在指定的时间内未返回就会抛出异常。

  这种确认方式最大的缺点:速度特别慢。如果消息没有确认,就会阻塞后续消息的发送,造成发送消息的速度很慢。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class SingleConfirm {
    /**
     * 发布确认模式
     * 1、单个确认
     * @param args
     */
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {

            channel = connection.createChannel();

            // 开启确认模式
            channel.confirmSelect();

            // 声明队列
            channel.queueDeclare("confirm", true, false, false, null);

            long begin = System.currentTimeMillis();

            // 批量发送消息,每次发送进行确认
            for (int i = 0; i <1000 ; i++) {

                String message = i+"";

                // 发布单条消息
                channel.basicPublish("", "confirm", null, message.getBytes());

                // 单个消息发送之后,马上发布确认,使用 waitForConfirms
                if(channel.waitForConfirms()){
                    //System.out.println("消息发送成功:"+i);
                }

            }

            long end = System.currentTimeMillis();

            System.out.println("发送1000条数据,使用单个发布确认的时间为:"+(end-begin));

        } catch (Exception e){
            e.printStackTrace();
        }finally {
            RabbitMQUtils.close(channel, connection);
        }

    }
}

非常浪费时间

(3)批量确认发布

  每发送一部分消息,批量同步确认一次,若有消息无法发出,该模式无法确认是哪个消息无法发送;

发布1000条消息,每发送100条确认一次

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class MultipleConfirm {
    public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {

            channel = connection.createChannel();

            // 开启确认模式
            channel.confirmSelect();

            // 声明队列
            channel.queueDeclare("confirm", true, false, false, null);

            long begin = System.currentTimeMillis();

            // 批量确认消息的数量,没发送100个返回一个确认ack
            int notAck = 100;

            // 批量发送消息,每次发送进行确认
            for (int i = 0; i <1000 ; i++) {

                String message = i+"";

                // 发布单条消息
                channel.basicPublish("", "confirm", null, message.getBytes());

                if((i+1)%notAck==0){
                    // 每发送100条确认一次,查看是否这一批是否有发送失败的情况
                    channel.waitForConfirmsOrDie();
                }


            }

            long end = System.currentTimeMillis();

            System.out.println("发送1000条数据,使用批量发布确认的时间为:"+(end-begin));

        } catch (Exception e){
            e.printStackTrace();
        }finally {
            RabbitMQUtils.close(channel, connection);
        }

    }
}

时间为 396毫秒

(4)异步发布确认

  生产者发送消息与 接收确认这两个步骤不是同步的,是异步的,生产者只管发送,同时使用监听(addConfirmListener)返回的确认,对成功确认、失败确认两种情况分别进行处理。非常高效且安全

  • 开启确认模式
  • 声明确认成功的callback
  • 声明确认失败的callback
  • 开启确认监听 addConfirmListener() ,设置callback
  • 信道发送消息,不需要额外设置接收waitForConfirm什么的
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
 public static void main(String[] args) {
        Connection connection = RabbitMQUtils.getConnect();
        Channel channel = null;

        try {

            channel = connection.createChannel();

            // 开启确认模式
            channel.confirmSelect();

            // 声明队列
            channel.queueDeclare("confirm", true, false, false, null);

            // 作为接收成功的函数式接口 参数
            ConfirmCallback ackCallback =(deliveryTag, multiple)-> System.out.println("确认的消息: "+deliveryTag);
                // 表示接收成功的回调函数


            // 作为接收失败的函数式接口 参数
            ConfirmCallback nackCallback = (deliveryTag,multiple)-> System.out.println("接收失败!");
                // 表示接收失败的回调函数


            // 这是一个异步的监听 消息返回确认信息的 反应
            channel.addConfirmListener(ackCallback,nackCallback);

            long begin = System.currentTimeMillis();


            // 批量发送消息,每次发送进行确认
            for (int i = 0; i <1000 ; i++) {
                String message = i+"";
                // 发布单条消息
                channel.basicPublish("", "confirm", null, message.getBytes());

            }

            long end = System.currentTimeMillis();

            System.out.println("发送1000条数据,使用异步发布确认的时间为:"+(end-begin)+"ms");

        } catch (Exception e){
            e.printStackTrace();
        }
        
        // 执行完不能关闭连接,还要继续监听确认的信息
        /**
         finally {
            RabbitMQUtils.close(channel, connection);
        }
         */

    }

时间非常高效

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-10-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
RabbitMQ发布确认
  生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。   confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
别团等shy哥发育
2023/02/25
6840
RabbitMQ发布确认
RabbitMQ 消息确认机制
消息的确认,是指生产者投递消息后,如果 Broker 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker ,这种方式也是消息的可靠性投递的核心保障!
海向
2019/09/23
1.1K0
RabbitMQ 消息确认机制
消息队列RabbitMQ核心:简单(Hello World)模式、队列(Work Queues)模式、发布订阅模式
消息队列RabbitMQ提供了六种工作模式:简单模式、work queues、发布订阅模式、路由模式、主题模式、发布确认模式。本文将介绍前三种工作模式。所有的案例代码都是使用Java语言实现。
百思不得小赵
2022/12/07
5600
消息队列RabbitMQ核心:简单(Hello World)模式、队列(Work Queues)模式、发布订阅模式
MQ发布确认
  生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
一个风轻云淡
2022/11/13
1.2K0
MQ发布确认
RabbitMQ 的消息确认机制(图文+代码)详解!
出处:www.cnblogs.com/haixiang/p/10900005.html
Java技术栈
2020/11/23
1.9K0
RabbitMQ 的消息确认机制(图文+代码)详解!
RabbitMQ发布确认详解
小熊学Java
2023/07/16
2280
RabbitMQ发布确认详解
RibbitMQ学习笔记之MQ发布确认
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,**所有在该信道上面发布的消息都将会被指派一个唯一的 ID( 从 1 开始),**一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
默 语
2024/11/20
650
RibbitMQ学习笔记之MQ发布确认
RabbitMQ 消息应答与发布
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。
用户9615083
2022/12/25
4360
RabbitMQ 消息应答与发布
RabbitMQ发布确认
RabbitMQ的发布确认(Publish Confirm)是一种机制,用于确保消息在发送到RabbitMQ之后被成功接收和持久化。通过使用发布确认,生产者可以获得对消息的可靠性保证,避免消息丢失。
堕落飞鸟
2023/05/16
6840
【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战
上篇文章介绍了rabbitmq的基本知识、交换机类型实战《【消息队列之rabbitmq】学习RabbitMQ必备品之一》 这篇文章主要围绕着消息确认机制为中心,展开实战;接触过消息中间件的伙伴都知道,消息会存在以下问题: 1、消息丢失问题和可靠性投递问题; 2、消息如何保证顺序消费; 3、消息如何保证幂等性问题,即重复消费问题等等… 本文主要以Rabbitmq消息中间件解决问题一的实践,其他问题小编会重新写文章总结;
沁溪源
2022/01/13
1.2K0
【消息队列之rabbitmq】Rabbitmq之消息可靠性投递和ACK机制实战
RabbitMQ之消息应答与发布确认
生产者发布消息到 RabbitMQ 后,需要 RabbitMQ 返回「ACK(已收到)」给生产者,这样生产者才知道自己生产的消息成功发布出去。
shaoshaossm
2022/12/27
5590
RabbitMQ之消息应答与发布确认
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
RabbitMQ中的消息确认机制是指生产者发送消息后,等待消费者确认消息已经被正确接收和处理的一种机制。消息确认机制的主要目的是确保消息的可靠传递和处理,以避免消息丢失或重复处理的情况发生。
GeekLiHua
2025/01/21
950
RabbitMQ批量发布确认
RabbitMQ的批量发布确认(Batch Publish Confirm)是一种机制,用于在发送大量消息时提高生产者的吞吐量和性能。通过批量发布确认,生产者可以一次性发送多条消息,并等待这批消息的确认回调,而不是每条消息单独等待确认。
堕落飞鸟
2023/05/16
1.4K0
RabbitMQ 七种工作模式介绍
RabbitMQ 共提供了7种⼯作模式供我们进⾏消息传递,接下来一一介绍它的实现与目的
用户11369558
2024/11/20
1080
RabbitMQ 七种工作模式介绍
RabbitMQ生产者Confirm消息(三)
RabbitMQ的特性是保障数据的一致性,稳定性和可靠性。但是如何来保障这些了?这就有了很多的保障机制。在前面的文章体系中也是介绍到RabbitMQ中的生产者负责把消息发送到Exchange,并不需要关心Queue是什么,那么问题就出现了,如果生产者发送的MQ消息消费者没有收到了?这如何可以做到前面说的数据的一致性以及可靠性了。我们可以结合现实的例子来看这部分,比如我向别人借了100元,然后我要了对方的银行卡号,把钱还给了对方,但是我给对方没有说,那么其实对方是不知道的,所以在对方的心理我始终还是欠他100元的,其实这样的案例在我实际的生活就出现过,当然是很多年前的事了,总是这过程确认反馈的机制。技术也是需要符合人性的,那么RabbitMQ为了做到数据的一致性的保障,在生产者端就有Confirm的确认机制。
无涯WuYa
2022/03/29
9000
RabbitMQ生产者Confirm消息(三)
RabbitMQ如何保证消息99.99%被发送成功?
要想保证消息不丢失,首先我们得保证生产者能成功的将消息发送到RabbitMQ服务器。
Java_老男孩
2019/06/06
9950
RabbitMQ---消息队列---上半部分
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
大忽悠爱学习
2021/12/07
1.1K0
RabbitMQ---消息队列---上半部分
四种途径提高RabbitMQ传输数据的可靠性(二)
针对问题(1),我们可以通过生产者的确认消息机制来解决,主要分为两种:第一是事务机制、第二是发送方确认机制
java架构师
2019/03/08
5580
四种途径提高RabbitMQ传输数据的可靠性(二)
RabbitMQ
​ MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。
OY
2022/03/21
1.8K0
RabbitMQ
029. RabbitMQ 消息可靠性和插件机制
1. 消息可靠性 ---- RabbitMQ 的消息可靠性,一般是业务系统接入消息中间件时首要考虑的问题,一般通过三个方面保障: 发送可靠性:确保消息成功发送到 Broker。 存储可靠性:Broker 对消息持久化,确保消息不会丢失。 消费可靠性:确保消息成功被消费。 1. 发送可靠性 一般消息发送可靠性分为 3 个层级: At most once:最多一次,消息可能会丢失,但绝不会重复传输。 At least once:最少一次,消息绝不会丢失,但可能会重复传输。 Exactly once:
山海散人
2021/03/03
3970
029. RabbitMQ 消息可靠性和插件机制
相关推荐
RabbitMQ发布确认
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文