首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >15-RabbitMQ高级特性-消费端限流

15-RabbitMQ高级特性-消费端限流

作者头像
彼岸舞
发布2022-10-06 08:39:53
3490
发布2022-10-06 08:39:53
举报

消费端限流

什么是消费端限流

  • 假设一个场景, 首先, 我们RabbitMQ服务器有上万条消息未处理的消息, 我们随机打开一个消费者客户端, 会出现下面情况
    • 巨量的消息瞬间全部推送过来, 但是我们单个客户端无法同时处理这么多数据
  • RabbitMQ提供了一种Qos(服务质量保证)功能, 即在非自动确认消息的前提下, 如果一定数目的消息(通过基于consumer或者channel设置Qos的值)未被确认前, 不进行消费新的消息
  • void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
  • 参数解释
    • prefetchSize: 0
    • prefetchCount: 会告诉RabbitMQ不要同时给一个消费者推送多余N个消息, 即一旦有N个消息还没有ACK, 则该consumer将block掉, 直到有消息ACK
    • global: true\false 是否将上面设置应用于channel, 简单点说, 就是上面限制是channel级别还是consumer级别
  • 注意
    • prefetchSize和global这两项, rabbitmq没有实现, 暂且不研究
    • prefetch_count和no_ack=false的情况下生效, 即在自动应答的情况下这两个值是不生效的

消费端限流代码实现

帮助类新增函数

public static AMQP.Queue.DeclareOk queueDeclare(Channel channel, String queueName, boolean durable) throws IOException {
    return channel.queueDeclare(queueName, durable, false, false, null);
}

消费者

package com.dance.redis.mq.rabbit.qos;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
 
public class Receiver {
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String queueName = "test001";  
        //    durable 是否持久化消息
        RabbitMQHelper.queueDeclare(channel,queueName,true);
        channel.basicQos(0, 1, false);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException {
                System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 参数:队列名称、是否自动ACK、Consumer
        channel.basicConsume(queueName, false, consumer);
        // 等待回调函数执行完毕之后,关闭资源。
        TimeUnit.SECONDS.sleep(50);
        channel.close();
        RabbitMQHelper.closeConnection();
    }
}

生产者

package com.dance.redis.mq.rabbit.qos;

import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class Sender {


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQHelper.getChannel();
        String queueName = "test001";
        RabbitMQHelper.queueDeclare(channel, queueName, true);
        Map<String, Object> headers = new HashMap<>();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .headers(headers).build();
        for (int i = 0; i < 5; i++) {
            String msg = "Hello World RabbitMQ " + i;
            channel.basicPublish("", queueName, props, msg.getBytes());
        }
    }

}

测试

启动消费者

启动生产者

查看消费者

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-10-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 消费端限流
    • 什么是消费端限流
      • 消费端限流代码实现
        • 测试
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档