前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ生产者Confirm消息保障(四)

RabbitMQ生产者Confirm消息保障(四)

作者头像
无涯WuYa
发布2022-03-29 16:02:59
6910
发布2022-03-29 16:02:59
举报
文章被收录于专栏:Python自动化测试

RabbitMQ生产者Confirm消息中介绍了RabbitMQ生产者端的消息确认的机制,也就是在生产者端把消息发送成功后进行消息的应答机制,但是如果生产者端发送的消息根本没有发送成功了?那么针对这种情况也是需要一种对应的解决方案来进行处理。针对这种特殊的情况RabbitMQ提供了Return消息保障的机制。

一、什么是Return消息保障

当消息生产者端把消息发送到Exchange和RoutingKey的时候,然后Exchange与Queue会形成一定的映射机制来消息发送的消息,这是一个正常的MQ生产消费的机制。但是在某些特殊的情况下,生产者发送的消息到Exchnage,但是Exchange不存在,还有一种情况是RoutingKey路由不到,导致生产者发送的消息无法让消费者来进行消费,针对这种情况生产者需要监听不可达的消息机制,也就是ReturnListener。在RabbitMQ消息队列服务器中,专门有一个配置来解决这种情况,具体配置的参数就是mandatory,它是一个boolean,如果配置的是true,那么意味着监听器会接收到路由不可达的消息,然后进行后续逻辑上的处理,但是如果配置参数是false,那么MQ的Broker就会自动删除该MQ的message。

二、Return消息机制流程

通过如下交互图来呈现Return消息保障的机制,具体如下:

三、Return代码实现

3.1、生产者代码

代码语言:javascript
复制
package com.example.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ProducerReturn
{
    private  static  final  String exchangeName="";
    private  static  final  String routyKey="return.save";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();


        String msg = "producer message return listener";
        channel.basicPublish(exchangeName, routyKey, null, msg.getBytes());

        /*
        * 添加发送消息前进行确认的机制,ReturnListener里面会提供一个接口的机制来进行保障
        * */
        channel.addReturnListener(new ReturnListener()
        {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException
            {
                System.out.println("-----------------handle return---------------------");
                System.out.println("replyCode:"+replyCode);
                System.out.println("replyText:"+replyText);
                System.out.println("exchange:"+exchange);
                System.out.println("routingKey:"+routingKey);
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        });

        //使用mandtory的方式来发送消息,该参数设置为true,如果接收失败,那么就会输出接口监听到的信息
        channel.basicPublish(exchangeName,routyKey,true,null,msg.getBytes());
    }
}

备注:在如上监听的接口的方法中,replyCode值的是返回的状态码,replyText值的是返回的文本信息。这地方故意写的是Exchange为空。同时在basicPublish中,参数mandatory参数设置的是true。

3.2、消费者代码

代码语言:javascript
复制
package com.example.rabbitmq;

import com.rabbitmq.client.*;

public class ConsumerReturn
{
    private static final String EXCHANGE = "test_return_exchange";
    private  static  final String queueName="test_return_queue";
    private  static  final  String routingKey="return.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();


            channel.exchangeDeclare(EXCHANGE,"topic",true,false,null);
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,EXCHANGE,routingKey);


            // 监听队列,从队列中获取数据
            channel.basicConsume(queueName,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

备注:如上是消费者的代码,基本没多少区别的,只是把消费者接收消息的部分单独的分离了出来,分离出来的代码具体如下所示:

代码语言:javascript
复制
package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("consumerTag:"+consumerTag);
        System.err.println("envelope:"+envelope);
        System.err.println("properties:"+properties);
        System.err.println("the message received:"+new String(body));

        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

3.3、执行结果

如上代码执行后,生产者端就会返回消息,也就是说在生产者发送消息的时候,找不到Exchange,这个时候消息肯定是无法发送的,我们开启了生产者端的监听,监听返回的信息如下:

代码语言:javascript
复制
-----------------handle return---------------------
replyCode:312
replyText:NO_ROUTE
exchange:
routingKey:return.save
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:producer message return listener

如上,我们就可以看到发送失败后,监听到了对应的消息,这个时候就是另外一层逻辑的判断,比如我们可以写一个判断,如果判断返回的状态码是312,然后重新创建Exchange,然后再次发送,也就是“重试”,这个在稳定性体系中是非常重要的一个环节。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-02-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、什么是Return消息保障
  • 二、Return消息机制流程
  • 三、Return代码实现
    • 3.1、生产者代码
      • 3.2、消费者代码
        • 3.3、执行结果
        相关产品与服务
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档