前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ的队列模式你真的懂吗?

RabbitMQ的队列模式你真的懂吗?

原创
作者头像
JavaEdge
发布2024-09-11 23:21:22
2920
发布2024-09-11 23:21:22
举报
文章被收录于专栏:Java

0 前言

官网描述六类工作队列模式:

  1. 简单队列模式:最简单的工作队列,一个消息生产者,一个消息消费者,一个队列。另称点对点模式
  2. 工作模式:一个消息生产者,一个交换器,一个消息队列,多个消费者。也称点对点模式
  3. 发布/订阅模式:无选择接收消息,一个消息生产者,一个交换器,多个消息队列,多个消费者
  4. 路由模式:基于发布/订阅模式,有选择的接收消息,即通过 routing 路由进行匹配条件是否满足接收消息
  5. 主题模式:同样是在发布/订阅模式的基础上,根据主题匹配进行筛选是否接收消息,比第四类更灵活
  6. RPC模式:拥有请求/回复的。也就是有响应的,这是其它都没的

1 简单队列模式

1 实现功能

一个生产者 P 发送消息到队列 Q,一个消费者 C 接收:

Pro

Pro负责创建消息队列,并发送消息入列:

  1. 获取连接
  2. 创建通道
  3. 创建队列声明
  4. 发送消息
  5. 关闭队列
代码语言:csharp
复制
public class Producer {

    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection newConnection = MQConnectionUtils.newConnection();
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "我是生产者生成的消息";
        System.out.println("生产者发送消息:" + msg);
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        channel.close();
        newConnection.close();
    }
}

Con

  1. 获取连接
  2. 获取通道
  3. 监听队列
代码语言:csharp
复制
public class Customer {

    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("002");
        Connection newConnection = MQConnectionUtils.newConnection();
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
创建vhost

2 工作队列模式

将耗时的任务分发给多个消费者(工作者)。

主要解决:处理资源密集型任务,且还要等他完成。有了工作队列,就可将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可取出消息并完成工作。若启动了多个工作进程,则工作就可在多个进程间共享。

工作队列也称公平性队列模式,循环分发,若有两个消费者,默认RabbitMQ按序将每条消息发给下一个 Con,每个消费者获得相同数量的消息,即轮询。

Pro

创建50个消息

代码语言:csharp
复制
public class Producer2 {

    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection newConnection = MQConnectionUtils.newConnection();
        Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         /**保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
        channel.basicQos(1);
        for (int i = 1; i <= 50; i++) {
            String msg = "生产者消息_" + i;
            System.out.println("生产者发送消息:" + msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        channel.close();
        newConnection.close();
    }
}

Con

代码语言:csharp
复制
public class Customer2_1 {

    private static final String QUEUE_NAME = "test_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("001");
        Connection newConnection = MQConnectionUtils.newConnection();
        final Channel channel = newConnection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msgString = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msgString);
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}

循环分发

启动生产者

启动两个消费者

Pro发送了50条消息进入队列,而上方消费者启动图里很明显的看到轮询的效果,就是每个消费者会分到相同的队列任务。

公平分发

由于上方模拟的是非常简单的消息队列的消费,假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。如一个1年的程序员,跟一个3年的程序员,分配相同的任务量,明显3年的程序员处理起来更加得心应手,很快就无所事事了,但是3年的程序员拿着非常高的薪资!显然3年的程序员应该承担更多的责任,咋办?

发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量,类似UDP,面向无连接。可用 basicQos,并将参数 prefetchCount 设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。

代码语言:php
复制
final Channel channel = newConnection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
channel.basicQos(1);

消息持久化

背景

上边我们提到的公平分发是由消费者收取消息时确认解决的,但是这里面又会出现被 kill 的情况。

当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

但是在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。咋办?

参数配置

参数配置一:生产者创建队列声明时,修改第二个参数为 true

代码语言:csharp
复制
/**3.创建队列声明 */
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN

代码语言:csharp
复制
for (int i = 1; i <= 50; i++) {
    String msg = "生产者消息_" + i;
    System.out.println("生产者发送消息:" + msg);
    channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
}

小结

  • 循环分发:消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息
  • 消息持久化:服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证RabbitMQ重启,队列和消息也不会丢失
  • 公平分发:指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题

3 发布订阅模式

工作队列模式是直接在生产者与消费者里声明好一个队列,消息就只会对应同类型的消费者。这种只处理同种类型的消息有弊端。

3.1 案例

门户网站,用户注册完后一般都会发送消息通知用户注册结果。如在一个系统中,用户注册信息有邮箱、手机号,在注册完后会向邮箱和手机号都发送注册完成信息。

利用 MQ 实现业务异步处理,若用工作队列,就声明一个注册信息队列。注册完成后生产者向队列提交一条注册数据,消费者取出数据同时向邮箱以及手机号发送两条消息。但实际上邮箱和手机号信息发送实际上是不同的业务逻辑,不应放在一块处理。

这时就可利用发布/订阅模式将消息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱、手机),并绑定到交换机。这样生产者只需要发布一次消息,两个队列都会接收到消息发给对应的消费者:

只需简单的将队列绑定到交换机。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列。就像子网广播,每台子网内的主机都获得一份复制的消息。

3.2 啥是发布订阅模式

可将消息发送给不同类型的消费者。即发布一次,消费多个:

X表示交换机、红色表示队列。

展示邮件、短信的例子,通过绑定到一个交换机,但是

3.3 实战

代码语言:csharp
复制
public class ProducerFanout {

    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        /** 1.创建新的连接 */
        Connection connection = MQConnectionUtils.newConnection();
        /** 2.创建通道 */
        Channel channel = connection.createChannel();
        /** 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 */
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        /** 4.发送消息 */
        for (int i = 0; i < 10; i++)
        {
            String message = "用户注册消息:" + i;
            System.out.println("[send]:" + message);
          	// 第二个参数为空类似于表示全局广播,只要绑定到该队列上的消费者理论上是都可收到
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
            try {
                Thread.sleep(5 * i);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        /** 5.关闭通道、连接 */
        channel.close();
        connection.close();
        /** 注意:如果消费没有绑定交换机和队列,则消息会丢失 */
    }
}

邮件消费者

代码语言:java
复制
public class ConsumerEmailFanout {

    private static final String QUEUE_NAME = "consumerFanout_email";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者启动");
        /* 1.创建新的连接 */
        Connection connection = MQConnectionUtils.newConnection();
        /* 2.创建通道 */
        Channel channel = connection.createChannel();
        /* 3.消费者关联队列 */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
      	// 第三个参数置为空时,可以接收到生产者所有的消息(生产者 routingKey 参数为空时)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取生产者消息:" + msg);
            }
        };
        /* 5.消费者监听队列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
短信消费者
代码语言:java
复制
public class ConsumerSMSFanout {

    private static final String QUEUE_NAME = "ConsumerFanout_sms";
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消费者启动");
        /* 1.创建新的连接 */
        Connection connection = MQConnectionUtils.newConnection();
        /* 2.创建通道 */
        Channel channel = connection.createChannel();
        /* 3.消费者关联队列 */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        /* 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey */
      	// 第三个参数置为空时,可接收到生产者所有的消息(生产者 routingKey 参数为空时)
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取生产者消息:" + msg);
            }
        };
        /* 5.消费者监听队列消息 */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
运行

先运行两个con,再运行pro。如没有提前将队列绑定到交换机,直接运行pro,消息是不会发到任何队列里的。

生产者

短信消费者

邮件消费者

小结

相比工作模式,发布订阅模式引入了交换机,类型上更灵活。

pro不是直接操作队列,而是将数据发给交换机,由交换机将数据发给与之绑定的队列。从不加特定参数的运行结果中可以看到,两种类型的消费者(email,sms)都收到相同数量消息。

必须声明交换机,并设置模式:channel.exchangeDeclare(EXCHANGE_NAME, "fanout"),fanout 指分发模式(将每一条消息都发送到与交换机绑定的队列)

队列必须绑定交换机:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

生产者发送消息到交换机,多个消费者声明多个队列,与交换机进行绑定,队列中的消息可以被所有消费者消费,类似QQ群消息

4 路由模式

就是发布订阅模式(Publish/Subscribe Pattern)中的直连交换机(Direct Exchange)。一种基于路由键(Routing Key)来路由消息的模式。在这种模式下,生产者发送消息时会指定一个路由键,交换机会根据这个路由键将消息路由到与之匹配的队列。

Pro

使用 channel.basicPublish 方法发送消息,并指定交换机名称和路由键。交换机会根据路由键将消息路由到与之匹配的队列。

Con

在消费者代码中,我们声明了一个直接交换机(direct 类型),并绑定了一个队列。在绑定队列时,我们使用 channel.queueBind 方法,并指定交换机名称、队列名称和路由键。交换机会根据路由键将消息路由到与之匹配的队列。

特点

  • 路由键匹配:消息的路由键必须与队列绑定的路由键完全匹配,才能将消息路由到该队列。
  • 直接交换机:直接交换机根据路由键进行精确匹配,适用于需要精确控制消息路由的场景。

通过这种方式,路由模式可以实现基于路由键的精确消息路由,适用于需要将消息发送到特定队列的场景。

5 主题模式

属于发布订阅模式的TopicExchange(主题交换机)。Queue 通过 routing key 绑定到 TopicExchange,当消息到达TopicExchange后,TopicEkchange 根据消息的 routing key 将消息路由到一个或者多个Queue。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0 前言
  • 1 简单队列模式
    • 1 实现功能
      • Pro
        • Con
          • 创建vhost
      • 2 工作队列模式
        • Pro
          • Con
            • 循环分发
              • 公平分发
                • 消息持久化
                  • 背景
                  • 参数配置
                • 小结
                • 3 发布订阅模式
                  • 3.1 案例
                    • 3.2 啥是发布订阅模式
                      • 3.3 实战
                        • 短信消费者
                        • 运行
                      • 小结
                      • 4 路由模式
                        • Pro
                          • Con
                            • 特点
                            • 5 主题模式
                            相关产品与服务
                            消息队列
                            腾讯云消息队列 TDMQ 是分布式架构中的重要组件,提供异步通信的基础能力,通过应用解耦降低系统复杂度,提升系统可用性和可扩展性。TDMQ 产品系列提供丰富的产品形态,包含 CKafka、RocketMQ、RabbitMQ、Pulsar、CMQ 五大产品,覆盖在线和离线场景,满足金融、互联网、教育、物流、能源等不同行业和场景的需求。
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档