前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ 学习(四)---- 工作队列模式

RabbitMQ 学习(四)---- 工作队列模式

作者头像
RAIN7
发布2022-10-04 20:23:11
5240
发布2022-10-04 20:23:11
举报

文章目录

RabbitMQ 学习(四)---- 工作队列模式

  这是第二种模型 (Work Queue),任务模型,当消息处理比较耗时的时候,生产者发送消息的速度远远大于消费的速度,长此以往,消息就会堆积的越来越多,无法及时处理,可以使用work模型,让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

(1)公平竞争机制

  一个生产者发送消息到默认交换机,通过路由同名规则将 队列中的信息 循环分发到 监听队列的消费者中,一对多,不过是公平分发,按照顺序将每条消息发送给每一个消费者。每个消费者平均分配队列中的消息。公平竞争。

在这里插入图片描述
在这里插入图片描述

  这里的生产者代码与 之前简单模型,一模一样,只不过循环发送了多条消息等待分配

代码语言:javascript
复制
package workAver;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import utils.RabbitMQUtils;

import java.io.IOException;

public class AvgProvider {
    public static void main(String[] args) {
        Connection connection =null;
        Channel channel =null;
        try {
            // 1、获取连接对象
            connection = RabbitMQUtils.getConnect();

            //2、通过连接获取信道
            assert connection != null;
            channel = connection.createChannel();

            // 声明发送的消息
            String message = "work平均分配生产的消息!";

            //3、声明队列信息
            channel.queueDeclare("work", false, false, false, null);

            for (int i = 1; i <= 10; i++) {
                //4、使用信道发送消息, routineKey与队列同名方便匹配
                channel.basicPublish("", "work", MessageProperties.PERSISTENT_TEXT_PLAIN, (message+": "+i).getBytes());
            }

        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            RabbitMQUtils.close(channel,connection);
        }
    }
}

  创建多个消费者,与之前简单模型一模一样,只不过创建了多个连接到 rabbitMq的服务器上对 队列进行监听

消费者1

代码语言:javascript
复制
package workAver;

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

public class AvgCustomer1 {
    public static void main(String[] args) {

        try {

            Connection connection = RabbitMQUtils.getConnect();
            Channel channel = connection.createChannel();

            // 声明队列
            channel.queueDeclare("work", false, false, false, null);

            // 该消费者1 接收队列中的消息
            channel.basicConsume("work", true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                }
            });


        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

消费者2

代码语言:javascript
复制
package workAver;

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

public class AvgCustomer2 {
    public static void main(String[] args) {

        try {

            Connection connection = RabbitMQUtils.getConnect();
            Channel channel = connection.createChannel();

            // 声明队列
            channel.queueDeclare("work", false, false, false, null);

            // 该消费者1 接收队列中的消息
            channel.basicConsume("work", true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(new String(body));
                }
            });


        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

消费者1 收到的消息情况,全部是奇数

在这里插入图片描述
在这里插入图片描述

消费者2 收到的消息的情况,全部都是偶数

在这里插入图片描述
在这里插入图片描述

可以证明,监听队列的消费者中间一次循环接收队列中的消息,公平竞争

(2)能者多劳机制

我们需要了解消费者的自动确认机制

  默认情况下,RabbitMQ会按照顺序将每个消息发送到下一个使用者,平均每个消费者会受到相同数量的消息,这种分发消息的机制称为循环。

而为什么会自动平均分配呢?

  因为我们在消费者消费的时候有一个参数设置为 autoAck:true,我们设置消费者接收消息自动确认,而一般都是队列中分配好了那个消费者要传递什么信息,直接一次全部传递过去,不是消费一个确认一下。消费者接收到所有消息之后自动确认,队列中会标记删除所有的信息,他不关心你接收完信息之后的后续业务操作。就是只关心你是否收到了数据。

  消息自动确认机制,完成一项任务可能需要几秒钟甚至几分钟,如果一个消费者开始了一项长期的任务,却只完成了一部分就挂了,那么RabbitMQ一旦将消息传递给消费者,就会立刻标记删除,那么因为消费者挂了,接收数据的时候已经确认应答了,队列中的数据也删除了,所以剩余接收到的信息也没了。

autoAck 取消 ,手动确认

  如果生产者发送10条消息,消费者1拿到5条,消费者2拿到5条,不进行自动应答,服务器队列的数据即使消费了,我们没有应答就不会被标记删除,保证服务器队列中的数据一直还在。如果消费者处理完了这条数据,那么手动确认,队列中知道已经确认了进行删除

接收消息的时候,参数设置为false

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SGXb3SYK-1663644288424)(C:\Users\rain7\AppData\Roaming\Typora\typora-user-images\1662977824211.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SGXb3SYK-1663644288424)(C:\Users\rain7\AppData\Roaming\Typora\typora-user-images\1662977824211.png)]

执行回调函数的时候,手动确认

在这里,我们在 handlerDeliver 方法中,

  • 第一个参数 deliveryTag,代表本次信道传递的标签标识,Envelope 类的实例包含了 本次传递的 deliverTag数据可直接获取
  • 第二个参数 multiple:批量

  比如批量确认.当multiple的值设置为true时,RabbitMQ将确认指定传输标签以及之前所有未被确认的消息。与单个确认相同,批量确认的作用域为每个通道。例如:通道Ch上有四个未被确认的消息,标签分别为5,6,7,8;当一个delivery_tag值为8并且multiple值为true的确认消息到达通道时,所有5到8的标签都会被确认。如果multiple值设置为false,标签为5,6,7的消息将不会被确认。

代码语言:javascript
复制
     // 手动确认
       channel.basicAck(envelope.getDeliveryTag(), false);

补充一点还有其他手动确认的API

在这里插入图片描述
在这里插入图片描述

chanel 传递1条数据

  在之前的公平竞争机制下,说是按照顺序给每一个消费者数据,其实在发送给消费者之前在内部已经计算好了,给消费者一第1、3、5、7、9数据,一次性发送,给消费者二第2、4、6、8、10条数据,然后一次性发送,如果我们手动确认的话,那么相当于一次确认一大批,队列经过确认后进行删除,如果在后续处理业务中挂掉了,照样消息已经删除了。

  我们设置一个信道每次只能消费一个消息,如果其中一个消费者服务器挂掉了,连接就断掉了,剩余的未被手动确认的数据还在队列中保存。也能及时得把剩余的消息继续交给消费者2进行处理,不耽误业务的持续进行。

  这就是能者多劳的机制。就是说处理快的消费者处理完业务会很快的手动确认,然后再次进行接收新的消息,处理慢的消费者经过一段时间处理之后再进行确认,就会能者多劳,业务处理快的接受的消息多,处理满的接受的少

在消费消息之前设置信道中接收消息只能是1个

代码语言:javascript
复制
chanel.basicQos(1); // 设置信道中一次只能消费一个信息

(3) 能者多劳的代码案例

1、生产者

生产者发送给队列 10条消息

代码语言:javascript
复制
package workAver;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import utils.RabbitMQUtils;

import java.io.IOException;

public class AvgProvider {
    public static void main(String[] args) {
        Connection connection =null;
        Channel channel =null;
        try {
            // 1、获取连接对象
            connection = RabbitMQUtils.getConnect();

            //2、通过连接获取信道
            assert connection != null;
            channel = connection.createChannel();

            // 声明发送的消息
            String message = "work平均分配生产的消息!";

            //3、声明队列信息
            channel.queueDeclare("work", false, false, false, null);

            for (int i = 1; i <= 10; i++) {
                //4、使用信道发送消息, routineKey与队列同名方便匹配
                channel.basicPublish("", "work", MessageProperties.PERSISTENT_TEXT_PLAIN, (message+": "+i).getBytes());
            }

        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            RabbitMQUtils.close(channel,connection);
        }
    }
}

2、消费者1

  接收到消息之后,会休眠2秒,在进行业务操作,作为处理较慢的消费者,设置信道每次传递一个,处理完业务手动确认ack。

代码语言:javascript
复制
package workAver;

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

public class AvgCustomer1 {
    public static void main(String[] args) {

        try {

            Connection connection = RabbitMQUtils.getConnect();
            final Channel channel = connection.createChannel();

            //声明信道中一次只能接受一条信息
            channel.basicQos(1);

            // 声明队列
            channel.queueDeclare("work", false, false, false, null);

            // 该消费者1 接收队列中的消息
            channel.basicConsume("work", false, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 休眠2s
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 处理业务
                    System.out.println(new String(body));

                    //手动确认,删除队列中的信息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });


        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3、消费者2

  接收到消息之后,休眠1秒,作为消费较快的消费者,设置信道传递1条数据,处理完业务之后手动确认。

代码语言:javascript
复制
package workAver;

import com.rabbitmq.client.*;
import utils.RabbitMQUtils;

import java.io.IOException;

public class AvgCustomer2 {
    public static void main(String[] args) {

        try {

            Connection connection = RabbitMQUtils.getConnect();
            final Channel channel = connection.createChannel();

            //声明信道中一次只能接受一条信息
            channel.basicQos(1);

            // 声明队列
            channel.queueDeclare("work", false, false, false, null);

            // 该消费者1 接收队列中的消息
            channel.basicConsume("work", false, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 休眠1秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 处理业务
                    System.out.println(new String(body));

                    // 手动确认
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            });


        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

4、查看接收结果

消费者1,处理业务2s,处理了4条消息

在这里插入图片描述
在这里插入图片描述

消费者2,处理业务1s,处理了6条消息

在这里插入图片描述
在这里插入图片描述

实现了能者多劳机制

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-09-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • RabbitMQ 学习(四)---- 工作队列模式
  • (1)公平竞争机制
  • (2)能者多劳机制
    • autoAck 取消 ,手动确认
      • chanel 传递1条数据
      • (3) 能者多劳的代码案例
        • 1、生产者
          • 2、消费者1
            • 3、消费者2
              • 4、查看接收结果
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档