前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ生产者消费者模型(二)

RabbitMQ生产者消费者模型(二)

作者头像
无涯WuYa
发布2022-03-29 16:00:56
5250
发布2022-03-29 16:00:56
举报
文章被收录于专栏:Python自动化测试

作为主流的MQ消息队列中间件,RabbitMQ也是具备了生产者消费者的模型,那么也就是说生产者把消息发送后,消费者来作为接收具体的消息。本文章主要详细的概述RabbitMQ的生产者投递和消费者监听。

一、消息传递流程

下面主要详细的总结下RabbitMQ消息队列服务器的整体流程,具体汇总如下:

  • 生产者只负责把消息投递到Exchange,这个过程不需要刻意的关注Queue
  • 而由Exchange把消息传递给Queue
  • 作为消费者的程序来负责监听Queue的消息
  • 为了保障消息传递的准确性以及及时性,Exchange与Queue会存在一定的绑定关系就是路由Key

二、MQ投递

依据RabbitMQ的架构模型,在生产者模型和消费者模型中,其实生产者和消费者并不知道对方的存在,这是异步通信的特性。作为生产者,它只需要把消息投递到Exchange,在这个过程中生产者并不需要关注Queue,事实上生产者也是无法关注到Queue的,那么消息是如何让消费者来监听并且接收消息了?这就是说会在Exchange和Queue之间建立一种映射关系,而这层关系就不是生产者所需要关注的了。作为消费者也不需要刻意的关注Exchange,而只需要监听Queue。

2.1、引入RabbitMQ的jar

要使用RabbitMQ的前提是需要引入RabbitMQ的jar,那么就需要在pom.xml文件里面新增RabbitMQ

的服务端和客户端,具体如下:

代码语言:javascript
复制
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </dependency>

2.2、生产者投递步骤

生产者把消息需要投递给Exchange,那么它的步骤具体总结如下:

代码语言:javascript
复制
ConnectionFactory类负责获取连接工厂
Connection类的对象获取一个连接
Channel创建数据通道信道,可以发送和接收消息

下面具体是完整的生产者投递的代码,具体如下:

代码语言:javascript
复制
package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer
{
    public static void main(String[] args) throws  Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();
        String msg = "Hello RabbitMQ";
        channel.basicPublish("saas", "", null, msg.getBytes());
        channel.close();
        connection.close();
    }
}

在如上中,我们可以看到我们首先需要连接到RabbitMQ的服务器,然后在发送消息message的时候我们需要指定具体的Exchange,因为对于生产者来说,它只关注的是把消息投递给Exchange。

2.3、消费者监听

生产者把消息投递到Exchange,那么作为消费者就需要来监听具体的消息了。监听的整个过程首先也是

需要建立RabbitMQ的服务器,这部分涉及到的代码具体如下:

代码语言:javascript
复制
package com.example.rabbitmq.quickstart;

import com.rabbitmq.client.*;

public class Consumer
{
    private static final String EXCHANGE = "saas";
    private  static  final String queueName="saas";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");
            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();

            channel.exchangeDeclare(EXCHANGE,BuiltinExchangeType.FANOUT);

            channel.queueDeclare(queueName,true,false,false,null);

            channel.queueBind(queueName,EXCHANGE,"");

            //创建一个消费者来消费数据
            DefaultConsumer consumer=new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(
                        String consumerTag,
                        com.rabbitmq.client.Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte [] body) throws  java.io.IOException
                {
                    String message=new String(body);
                    System.out.println("接收到的消息为:"+message);
                };
            };
            System.out.println("消费者程序启动成功,准备接收生产者的数据:\n");
            channel.basicConsume(queueName,consumer);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

在如上中,我们看到Exchange与生产端的Exchange名字是一样的,那么只有这样才能够建立绑定关系,再说的更加简单点来说,生产者把消息给到Exchange,然后Exchange与Queue之间有一个层映射关系,那么只有这样消费者监听队列才能够收取message的消息。

2.4、绑定关系

刚才说到Exchange与Queue之间的绑定关系,下面就针对这部分具体的演示下。我们先启动消费者

的程序,启动成功后,就会自动的创建Exchange和Queue,就可以从Exchange的绑定以及Queue的绑定

中能够获取到对应的绑定关系。

2.4.1、Exchange绑定关系

下面的图是消费者的程序启动后创建的Exchange,以及它的绑定关系,具体如下:

2.4.2、消费者绑定关系

在Exchange的绑定关系中,点击To里面saas,就会自动的跳转到Queue,具体如下所示:

2.5、406错误避免

很多初学者在学习RabbitMQ的时候,总是提前创建好Exchange和Queue,这样结果导致消费者的程序报很多的错误,具体错误如下:

代码语言:javascript
复制
java.io.IOException
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
	at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:783)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:252)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:242)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:222)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.exchangeDeclare(AutorecoveringChannel.java:227)
	at com.example.rabbitmq.quickstart.Consumer.main(Consumer.java:31)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saas' in vhost '/': received 'fanout' but current is 'direct', class-id=40, method-id=10)
	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
	at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
	... 6 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'saas' in vhost '/': received 'fanout' but current is 'direct', class-id=40, method-id=10)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:739)
	at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
	at java.lang.Thread.run(Thread.java:748)

其实遇到该问题,最简单解决问题的方式就是删除自己创建的Exchange和Queue。删除后,再次执行消费者的程序,它会自动创建Exchange和Queue,而且也就不会再报一系列的具体问题了。解决了如上的问题后,再次执行生产者的程序,就可以看到生产者发送的消息就能够被消费者这边监听到。感谢您的阅读。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-01-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、消息传递流程
  • 二、MQ投递
    • 2.1、引入RabbitMQ的jar
      • 2.2、生产者投递步骤
        • 2.3、消费者监听
          • 2.4、绑定关系
            • 2.4.1、Exchange绑定关系
            • 2.4.2、消费者绑定关系
          • 2.5、406错误避免
          相关产品与服务
          消息队列 CMQ 版
          消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档