前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ生产者Confirm消息(三)

RabbitMQ生产者Confirm消息(三)

作者头像
无涯WuYa
发布2022-03-29 16:01:30
8770
发布2022-03-29 16:01:30
举报
文章被收录于专栏:Python自动化测试

RabbitMQ的特性是保障数据的一致性,稳定性和可靠性。但是如何来保障这些了?这就有了很多的保障机制。在前面的文章体系中也是介绍到RabbitMQ中的生产者负责把消息发送到Exchange,并不需要关心Queue是什么,那么问题就出现了,如果生产者发送的MQ消息消费者没有收到了?这如何可以做到前面说的数据的一致性以及可靠性了。我们可以结合现实的例子来看这部分,比如我向别人借了100元,然后我要了对方的银行卡号,把钱还给了对方,但是我给对方没有说,那么其实对方是不知道的,所以在对方的心理我始终还是欠他100元的,其实这样的案例在我实际的生活就出现过,当然是很多年前的事了,总是这过程确认反馈的机制。技术也是需要符合人性的,那么RabbitMQ为了做到数据的一致性的保障,在生产者端就有Confirm的确认机制。

一、Confirm确认消息

RabbitMQ消息队列服务器生产者Confirm确认消息可以具体总结为如下:

  • 生产者发送的消息发送到Exchange后,如果RabbitMQ的Broker收到消息,需要给生产者一个应答
  • 生产者会负责接收应答的消息,核心的目的是来确认发送的消息是否发送到Broker,这也是RabbitMQ

消息可靠性投递的完美设计。

二、Confirm交换图

生产者在发送消息后,需要MQ代理来确认是否消息发送成功,那么它的交换图可以梳理为如下:

三、Confirm案例实战

要想实现生产者的Confirm确认消息的机制,那么就需要在生产者端的代码中开启确认消息机制,然后在Channel上来进行具体的监听,如果成功,就会返回监听成功的信息。当然这个过程很难同步的实现,它是基于异步的机制来进行实现的,其实这也是很好理解的。因为同步可能存在超时以及堵塞的情况。

3.1、生产者代码

代码语言:javascript
复制
package com.example.rabbitmq.confirm;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ProducerConfirm
{
    private  static  final  String exchangeName="saas";
    private  static  final  String routyKey="saas";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.43.158.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        //指定消息的投递模式:消息的确认模式
        channel.confirmSelect();


        //通过channel来发送具体的数据信息
        String msg = "producer send message";
        channel.basicPublish(exchangeName, routyKey, null, msg.getBytes());

        //添加一个消息确认监听
        channel.addConfirmListener(new ConfirmListener()
        {
            //消息成功的返回信息
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException
            {
                System.out.println("message send success!");
            }

            //消息失败的返回机制
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException
            {
                System.out.println("message send failed!");
            }
        });
    }
}

在如上的代码中,首先在channel中指定了消息确认,也就是channel.confirmSelect(),然后也添加了生产者端的应答监听机制,如果是成功,就会调用handleAck,如果是失败就会调用handleNack的方法。

3.2、消费者代码

代码语言:javascript
复制
package com.example.rabbitmq.confirm;

import com.rabbitmq.client.*;

public class ConsumerConfirm
{
    private static final String EXCHANGE = "saas";
    private  static  final String queueName="saas";
    private  static  final  String routingKey="saas";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.43.158.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();

            Channel channel=connection.createChannel();

            channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.FANOUT);

            channel.queueDeclare(queueName,true,false,false,null);

            channel.queueBind(queueName,EXCHANGE,routingKey);

            DefaultConsumer consumer=new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(
                        String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte [] body) throws  java.io.IOException
                {
                    String message=new String(body);
                    System.out.println("receive message is :"+message);
                };
            };
            channel.basicConsume(queueName,consumer);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

3.3、案例演示

在生产者端以及消费者端的代码中,可以很清晰的看到它的Exchange和routingKey都是一一对应的,那么也就是说生产者发送的消息到Exchange,然后在Exchange这层它的routingKey与Queue都是对应的,那么发送的消息是能够接收成功的。如下图显示的是生产者以及消费端的消息,具体如下:

当然在生产端开启消息的确认保障机制后,生产者就不能关闭它的连接数和channel,如果关闭的话就无法达到轮训监听确认消息的机制。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-02-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Confirm确认消息
  • 二、Confirm交换图
  • 三、Confirm案例实战
    • 3.1、生产者代码
      • 3.2、消费者代码
        • 3.3、案例演示
        相关产品与服务
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档