前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式--RabbitMQ入门

分布式--RabbitMQ入门

作者头像
aruba
发布2022-09-19 15:41:25
5230
发布2022-09-19 15:41:25
举报
文章被收录于专栏:android技术android技术

一、简介

1. AMQP

分布式项目中,模块与模块之间的通信可以使用RPC框架,如Dubbo,但RPC中调用方模块获取到被调用方的结果是同步的,争对一些只需要异步调用的方法,如日志存储、发送消息等,RPC就显得效率低下了,AMQP协议的推出就是用来解决进程之间的异步消息通信

AMQP

从设计上来说,AMQP就是一个发布订阅者模式,整体可以看作一个流,核心是中间的管道,即消息队列 有了AMQP,发布者只需要关注发布消息,订阅者只需要关注订阅消息,而流的速度、总承载量等都交由AMQP管理,从而做到异步调用、削峰填谷、服务解耦

2. RabbitMQ

RabbitMQ也是实现了AMQP的一种消息中间件,由Erlang编写,由于Erlang语言对并发的特性,RabbitMQ相对于其他MQ(kafka、RocketMQ等),延迟最低

二、安装

1. Erlang安装

安装编译Erlang所使用的工具:

代码语言:javascript
复制
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel

下载解压Erlang源码,erlang安装版本需要和rabbitmq对应,查看网址:https://www.rabbitmq.com/which-erlang.html#compatibility-matrix

代码语言:javascript
复制
wget http://erlang.org/download/otp_src_23.2.tar.gz
tar -xvf otp_src_23.2.tar.gz

编译:

代码语言:javascript
复制
cd otp_src_23.2
mkdir -p /usr/local/erlang
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac

安装:

代码语言:javascript
复制
make install

配置全局环境变量:

代码语言:javascript
复制
vi /etc/profile
代码语言:javascript
复制
export ERLANGROOT=/usr/local/erlang
export PATH=$PATH:$ERLANGROOT/bin
2. RabbitMQ安装

争对Erlang版本为23.2,我们安装RabbitMQ的3.8.35版本,其他版本可以从githbu上查看:https://github.com/rabbitmq/rabbitmq-server/tags

下载并解压:

代码语言:javascript
复制
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.35/rabbitmq-server-generic-unix-3.8.35.tar.xz
mkdir /usr/local/rabbitmq
tar -xvf rabbitmq-server-generic-unix-3.8.35.tar.xz -C /usr/local/rabbitmq/

生效界面插件:

代码语言:javascript
复制
cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin
./rabbitmq-plugins enable rabbitmq_management

启动RabbitMQ:

代码语言:javascript
复制
./rabbitmq-server -detached

停止RabbitMQ:

代码语言:javascript
复制
./rabbitmqctl stop_app

访问15672端口:

三、账号管理

管理界面可以在本地使用guest账号(密码与账号相同)登录,但外部访问是不允许的,需要为RabbitMQ添加账号

1. 创建账号
代码语言:javascript
复制
./rabbitmqctl add_user aruba aruba
2. 授予角色
代码语言:javascript
复制
./rabbitmqctl set_user_tags aruba administrator
3. 账号授权
代码语言:javascript
复制
./rabbitmqctl set_permissions -p "/" aruba ".*" ".*" ".*"

"/"对应默认的虚拟主机

4. 登录

使用新的账号进行登录:

四、RabbitMQ架构

下面是RabbitMQ实现原理的细节部分

针对上面名词的解释:

名词

解释

Exchange 交换机

接收发布者消息,并将消息通过路由规则路由到相应队列

Binding key

消息队列和交换器之间的关联,通过路由键Routing-key绑定

Channel 信道

TCP里面的虚拟链接。发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。由于TCP连接数有上限,且必须通过三次握手,建立连接时性能低下,相比于每个连接都使用TCP,一条TCP连接可以容纳无限的信道

Virtual Host

虚拟主机。表示一批交换器,消息队列和相关对象。默认创建一个为"/"的虚拟主机

我们主要关注RabbitMQ如何发送消息和订阅消费消息,首先RabbitMQ服务中交换机和队列通过Routing-key进行关联,再由发布者与RabbitMQ服务通过Channel建立连接后,通过Routing-key发送消息到交换机,最后由交换机对消息进行路由匹配到相应队列后,由队列进行分发 而订阅消费消息,一个消息只能有一个消费者成功的消费,收到消息后需要发送ack,告诉RabbitMQ服务成功的消费了该消息,针对多个消费者订阅一个队列的情况,RabbitMQ默认使用轮询的方式发送给不同的消费者

五、RabbitMQ通讯方式

Rabbit提供的通讯方式,可以从官网查看:https://rabbitmq.com/getstarted.html 分别为:

通讯方式

描述

Hello World!

为了入门操作,使用默认交换机,一个队列被一个消费者订阅

Work queues

使用默认交换机,一个队列可以被多个消费者订阅

Publish/Subscribe

手动创建交换机(FANOUT),一个消息可以路由到多个队列中

Routing

手动创建交换机(DIRECT),按照routing-key进行路由匹配

Topics

手动创建交换机(TOPIC),routing-key支持通配符匹配

RPC

RPC方式

Publisher Confirms

保证消息可靠性

创建一个项目,导入依赖:

代码语言:javascript
复制
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>

连接工具类:

代码语言:javascript
复制
public class RBConnectionUtil {
    public static final String RABBITMQ_HOST = "192.168.42.4";

    public static final int RABBITMQ_PORT = 5672;

    public static final String RABBITMQ_USERNAME = "guest";

    public static final String RABBITMQ_PASSWORD = "guest";
    // 虚拟主机
    public static final String RABBITMQ_VIRTUAL_HOST = "/";

    /**
     * 构建RabbitMQ的连接对象
     *
     * @return
     */
    public static Connection getConnection() throws Exception {
        //1. 创建Connection工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2. 设置RabbitMQ的连接信息
        factory.setHost(RABBITMQ_HOST);
        factory.setPort(RABBITMQ_PORT);
        factory.setUsername(RABBITMQ_USERNAME);
        factory.setPassword(RABBITMQ_PASSWORD);
        factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);

        //3. 返回连接对象
        Connection connection = factory.newConnection();
        return connection;
    }
    
}
1. Hello World!

Hello World!使用的是默认的交换机

生产者:

代码语言:javascript
复制
public class Publisher {

    private static final String QUEUE_NAME = "hello";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //4. 发送消息
        String message = "hello rabbit";
        //参数: 交换机(默认为"") routing-Key(默认交换机就是队列名) 消息其他参数  消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    }

}

运行后,管理界面可以看到队列中有了一条消息:

消费者:

代码语言:javascript
复制
public class Consumer {

    private static final String QUEUE_NAME = "hello";

    @Test
    public void consumer() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, callback);

        System.in.read();
    }
    
}

运行结果:

2. Work queues

Work queues也是使用默认交换机,发送消息是一样的,只不过一个队列被两个消费者订阅

2.1 轮询消费

上面提到过,一个消息只能被一个消费者消费,RabbitMQ默认使用轮询方式分发给不同的消费者

生产者:

代码语言:javascript
复制
public class Publisher {

    private static final String QUEUE_NAME = "work";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //4. 发送消息
        for (int i = 0; i < 10; i++) {
            //参数: 交换机(默认为"") routing-Key(默认交换机就是队列名) 消息其他参数  消息
            channel.basicPublish("", QUEUE_NAME, null, String.valueOf(i).getBytes());
        }
    }

}

消费者:

代码语言:javascript
复制
public class Consumer {

    private static final String QUEUE_NAME = "work";

    @Test
    public void consumer1() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(QUEUE_NAME, true, callback);

        System.in.read();
    }

    @Test
    public void consumer2() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(QUEUE_NAME, true, callback);

        System.in.read();
    }

}

运行结果:

2.2 饿汉消费

针对上面的方式,每个消费者不管性能如何,都会按照轮询方式进行分发,如果消费者1消费一个消息需要200ms消费者2消费一个消息需要1000ms,消费完10个消息最少需要5s,那么如何解决这个问题?

手动ack+消息流控:自动ack当接收到消息时就会发送ack给RabbitMQ服务,我们需要在执行完逻辑处理后,手动执行ack,还需要设置消息流控(一次拿多少条消息),才能实现争抢消息

消费者:

代码语言:javascript
复制
public class Consumer {

    private static final String QUEUE_NAME = "work";

    @Test
    public void consumer1() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3.1 流控设置
        channel.basicQos(1);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
                //手动ack  参数:deliveryTag 是否批量操作
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(QUEUE_NAME, false, callback);

        System.in.read();
    }

    @Test
    public void consumer2() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //3.1 流控设置
        channel.basicQos(1);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));
                //手动ack  参数:deliveryTag 是否批量操作
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(QUEUE_NAME, false, callback);

        System.in.read();
    }

}

运行结果:

3. Publish/Subscribe

通过Work queues我们知道了一条消息只能被一个消费者消费,而实际开发中,一条消息需要被多个消费者消费的情况很多 Publish/Subscribe就是为了解决这个问题而产生的,队列中的一条消息只能被一个消费者消费,而不同队列中可以存放相同的消息,Publish/Subscribe使得将一条消息路由到多个队列,进而被多个消费者订阅消费

Publish/Subscribe需要手动创建交换机(FANOUT),并手动绑定多个队列

生产者:

代码语言:javascript
复制
public class Publisher {

    public static final String QUEUE_NAME1 = "p1";
    public static final String QUEUE_NAME2 = "p2";
    public static final String EXCHANGE_NAME = "ps";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        //4. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

        //5. 交换机与队列绑定 参数: 队列名 交换机名 Routing-Key(直接绑定为空即可)
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "");

        //6. 发送消息
        //参数: 交换机 routing-Key(直接绑定不需要) 消息其他参数  消息
        channel.basicPublish(EXCHANGE_NAME, "", null, "publish/subscribe!".getBytes());
    }

}

消费者:

代码语言:javascript
复制
public class Consumer {

    @Test
    public void consumer1() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(Publisher.QUEUE_NAME1, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1获取到消息:" + new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(Publisher.QUEUE_NAME1, true, callback);

        System.in.read();
    }

    @Test
    public void consumer2() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(Publisher.QUEUE_NAME2, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2获取到消息:" + new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(Publisher.QUEUE_NAME2, true, callback);

        System.in.read();
    }

}

运行结果:

4. Routing

Routing不仅需要手动创建交换机(DIRECT),还需要在绑定队列时指定routing-key,消息的发送是根据routing-key来路由到不同的队列

生产者:

代码语言:javascript
复制
public class Publisher {

    public static final String QUEUE_NAME1 = "q1";
    public static final String QUEUE_NAME2 = "q2";
    public static final String EXCHANGE_NAME = "routing";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //4. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

        //5. 交换机与队列绑定 参数: 队列名 交换机名 Routing-Key
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "a");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "b");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "c");

        //6. 发送消息
        //参数: 交换机 routing-Key 消息其他参数  消息
        channel.basicPublish(EXCHANGE_NAME, "a", null, "hi a".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "b", null, "hi b".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "c", null, "hi c".getBytes());
    }

}

消费者和Publish/Subscribe的代码相同:

代码语言:javascript
复制
public class Consumer {

    @Test
    public void consumer1() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(Publisher.QUEUE_NAME1, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1获取到消息:" + new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(Publisher.QUEUE_NAME1, true, callback);

        System.in.read();
    }

    @Test
    public void consumer2() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(Publisher.QUEUE_NAME2, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2获取到消息:" + new String(body, "UTF-8"));
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(Publisher.QUEUE_NAME2, true, callback);

        System.in.read();
    }

}

运行结果:

5. Topic

Topic的路由规则可以通过通配符进行匹配

  • *:表示占位符
  • #:表示通配符

生产者:

代码语言:javascript
复制
public class Publisher {

    public static final String QUEUE_NAME1 = "topic-q1";
    public static final String QUEUE_NAME2 = "topic-q2";
    public static final String EXCHANGE_NAME = "topic";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        //4. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_NAME1, false, false, false, null);
        channel.queueDeclare(QUEUE_NAME2, false, false, false, null);

        //5. 交换机与队列绑定 参数: 队列名 交换机名 Routing-Key
        channel.queueBind(QUEUE_NAME1, EXCHANGE_NAME, "*.a.*");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "*.b");
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "c.#");

        //6. 发送消息
        //参数: 交换机 routing-Key 消息其他参数  消息
        channel.basicPublish(EXCHANGE_NAME, "1.a.3", null, "hi a".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "2.b", null, "hi b".getBytes());
        channel.basicPublish(EXCHANGE_NAME, "c.1.2.3", null, "hi c".getBytes());
    }

}

消费者代码相同,运行结果:

6. RPC

RPC方式就是生产者发送消息时附带一些信息,消费者消费时,又通过另一个队列发送返回信息给生产者

生产者不仅仅要发送消息,还要订阅另一个队列:

代码语言:javascript
复制
public class Publisher {

    public static final String QUEUE_PUB = "topic-publisher";
    public static final String QUEUE_CON = "topic-consumer";

    @Test
    public void publisher() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 创建信道
        Channel channel = connection.createChannel();

        //3. 构建队列  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(QUEUE_PUB, false, false, false, null);
        channel.queueDeclare(QUEUE_CON, false, false, false, null);

        //4. 监听消费者发送消息
        channel.basicConsume(QUEUE_CON, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者发送的消息:" + new String(body, "UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

        //5. 发送消息
        // 构建额外的信息
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .replyTo(QUEUE_CON)
                .correlationId(UUID.randomUUID().toString())
                .build();
        //参数: 交换机 routing-Key 消息其他参数  消息
        channel.basicPublish("", QUEUE_PUB, props, "hi consumer".getBytes());

        System.in.read();
    }

}

消费者除了订阅消息外,还需要做相应的返回消息处理:

代码语言:javascript
复制
public class Consumer {

    @Test
    public void consumer() throws Exception {
        //1. 获取连接对象
        Connection connection = RBConnectionUtil.getConnection();

        //2. 构建信道
        Channel channel = connection.createChannel();

        //3. 构建队列 与生产者相同  参数:队列名 是否持久化 是否排外(只允许一个消费者) 长时间未使用是否自动删除 其他参数
        channel.queueDeclare(Publisher.QUEUE_PUB, false, false, false, null);
        channel.queueDeclare(Publisher.QUEUE_CON, false, false, false, null);

        //4. 监听消息
        DefaultConsumer callback = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者获取到消息:" + new String(body, "UTF-8"));

                //5. 开始返回消息
                String respQueueName = properties.getReplyTo();
                AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("", respQueueName, props, "hi publisher".getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 参数:队列名 是否自动ack 监听回调
        channel.basicConsume(Publisher.QUEUE_PUB, false, callback);

        System.in.read();
    }

}

运行结果:

项目地址:

https://gitee.com/aruba/rabbit-mqstudy

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-07-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、简介
    • 1. AMQP
      • 2. RabbitMQ
      • 二、安装
        • 1. Erlang安装
          • 2. RabbitMQ安装
          • 三、账号管理
            • 1. 创建账号
              • 2. 授予角色
                • 3. 账号授权
                  • 4. 登录
                  • 四、RabbitMQ架构
                  • 五、RabbitMQ通讯方式
                    • 1. Hello World!
                      • 2. Work queues
                        • 2.1 轮询消费
                        • 2.2 饿汉消费
                      • 3. Publish/Subscribe
                        • 4. Routing
                          • 5. Topic
                            • 6. RPC
                              • 项目地址:
                              相关产品与服务
                              消息队列 CMQ 版
                              消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档