首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >RabbitMQ工作队列之公平分发消息与消息应答(ACK)

RabbitMQ工作队列之公平分发消息与消息应答(ACK)

作者头像
一个程序员的成长
发布2020-11-25 14:52:50
发布2020-11-25 14:52:50
93900
代码可运行
举报
文章被收录于专栏:bingfeng-技术bingfeng-技术
运行总次数:0
代码可运行

上篇文章中,我们讲了工作队列轮询的分发模式,该模式无论有多少个消费者,不管每个消费者处理消息的效率,都会将所有消息平均的分发给每一个消费者,也就是说,大家最后各自消费的消息数量都是一样多的。由此也就引发我们今天要介绍的公平分发模式。

消息应答(ACK)

消息丢失

我们之前的所有代码,如果消息队列将消息分发给消费者,那么就会从队列中删除,如果在我们处理任务的过程中,处理失败或者服务器宕机,那么这条消息肯定得不到执行,就会出现丢失。

我们所设想的如果任务在处理的过程中,如果服务器宕机等原因造成消息未被正常消费,那么必须分发给其他的消费者再次进行消费,这样及时服务器宕机也不会丢失任何的消息了。

ACK

所以ACK,就是消息应答机制,我们之前写的代码都是开启了自动应答,所以如果我们的消息没被正常消费,就会丢失。

要想确保消息不丢失,就必须将ACK自动应答关闭掉,在我们处理消息的流程中,如果消息正常被处理,那么最后进行手动应答,告诉队列我们正常消费了消息。

超时

RabbitMQ它是没有我们平常所见到的超时时间限制的,只要当消费者服务宕机,消息才会被重新分发,哪怕处理这条消息需要花费很长的时间。

公平分发模式

缺陷

我们提供多个消费者,目的就是为了提高系统的性能,提升系统处理任务的速度,如果将消息平均的分发给每个消费者,那么处理消息快的服务是不是会空闲下来,而处理慢的服务可能会阻塞等待处理,这样的场景是我们不愿意看到的。所以有了今天要说的分发模式,公平分发

能者多劳

所谓的公平分发,其实用能者多劳描述更为贴切,根据名字就可以知道,谁有能力处理更多的任务,那么就交给谁处理,防止消息的挤压。

那么想要实现公平分发,那么必须要将自动应答改为手动应答。这是公平分发的前提。

代理

消息生产者

代码语言:javascript
代码运行次数:0
运行
复制
public class Send {

    public static final String QUEUE_NAME = "test_word_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        // 获取连接
        Connection connection = MQConnectUtil.getConnection();

        // 创建通道
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 10; i++) {

            String msg = "消息:" + i;

            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            Thread.sleep(i * 20);

            System.out.println(msg);
        }

        channel.close();
        connection.close();
    }
}

消费者1

我们在消费者中设置了channel.basicQos(1);这样一个参数,这个意思就是表示,此消费者每次最多只接收一条消息进行处理,只有将消息处理结束,手动应答之后,下一条消息才会被分发进来。

代码语言:javascript
代码运行次数:0
运行
复制
public class Consumer1 {

    public static final String QUEUE_NAME = "test_word_queue";

    public static void main(String[] args) throws Exception {

        // 获取连接
        Connection connection = MQConnectUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        // 一次仅接受一条未经确认的消息
        channel.basicQos(1);

        // 队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String msg = new String(body, StandardCharsets.UTF_8);

                System.out.println("消费者[1]-内容:" + msg);

                Thread.sleep(2 * 1000);

                // 手动回执消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 监听队列,将自动应答方式改为false,关闭自动应答机制
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

消费者2

代码语言:javascript
代码运行次数:0
运行
复制
public class Consumer2 {

    public static final String QUEUE_NAME = "test_word_queue";

    public static void main(String[] args) throws Exception {

        // 获取连接
        Connection connection = MQConnectUtil.getConnection();

        // 创建频道
        Channel channel = connection.createChannel();

        // 队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);

        // 定义消费者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @SneakyThrows
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String msg = new String(body, StandardCharsets.UTF_8);

                System.out.println("消费者[2]-内容:" + msg);

                Thread.sleep(1000);

                // 手动回执消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 监听队列,需要将自动应答方式改为false
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

消费结果

那么结果就会像我们之前预想的那样,由于消费者2消费消息花费的时间比消费者1更少,所以消费者2处理的消息的数量要比消费者1处理的消息的数量要多。这里我就不贴图了,大家可以敲代码进行尝试。


今天的文章到这里就结束了,下篇呢,会给介绍介绍另外一种模式,发布订阅模式

日拱一卒,功不唐捐

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 一个程序员的成长 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消息应答(ACK)
    • 消息丢失
    • ACK
    • 超时
  • 公平分发模式
    • 缺陷
    • 能者多劳
  • 代理
    • 消息生产者
    • 消费者1
    • 消费者2
    • 消费结果
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档