前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ优先级队列机制(八)

RabbitMQ优先级队列机制(八)

作者头像
无涯WuYa
发布2022-03-29 16:06:12
3670
发布2022-03-29 16:06:12
举报
文章被收录于专栏:Python自动化测试

一、什么是优先级队列

在服务级级别的测试中需要考虑被执行任务的优先级机制,也就是通过线程优先级来进行,设置优先级的目的是在资源非常紧张的情况下,让优先级高的任务优先执行,而优先级低的任务排后执行,当然这样的一种设置机制只能是异步的模式下执行,如果是设计在同步的模式下执行,那这个设计从系统上来说就缺少宏观维度的思考。在RabbitMQ的机制中也是提供了队列的优先级机制,这样设计的目的也是在在生产者生产过快,而消费者消费不过来的情况下,也就是资源在紧张或者说是在有限的情况下,设置的队列优先级高的任务它的消息优先进行消费,而优先级低的消息排后消费。当然,如果是在资源不紧张的情况下,设置优先级其实没多大的意义,因为这个时候优先过来的消息先进行消费,也谈不上排队的机制和优先级的机制。

二、优先级的实现机制

针对优先级的设置,在消费者端进行设置,参数具体是x-max-priority,涉及的代码具体如下:

代码语言:javascript
复制
            //设置优先级
            Map<String,Object>  arguments =new HashMap<String,Object>();
            arguments.put("x-max-prioroty",10);
            channel.queueDeclare(queueName,true,false,false,arguments);

这样消费者的代码执行后,在RabbitMQ的WEB控制台,就可以看到该消息队列显示设置的优先级,具体如下所示:

如上,我们演示了配置一个队列的最大优先级,其实核心的是需要在生产者发送消息的时候设置当前发送任务的优先级涉及代码如下:

代码语言:javascript
复制
               AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(8)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());

在如上中,我们设置的发送任务的优先级是8。

三、优先级队列实战代码

3.1、生产者代码
代码语言:javascript
复制
package com.example.rabbitmq.priority;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ProducerPriority
{
    private  static  final  String exchangeName="test_priority_exchange";
    private  static  final  String routyKey="priority.test";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        for(int i=0;i<3;i++)
        {
            Map<String,Object>  headers=new HashMap<String,Object>();
            headers.put("num",i);

            if (i==0)
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(8)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }
            else if (i==1)
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(3)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }
            else
            {
                AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                        .deliveryMode(2)
                        .contentEncoding("UTF_8")
                        .headers(headers)
                        .priority(9)
                        .build();

                String msg = "Hello RabbitMQ priority Message"+i;
                channel.basicPublish(exchangeName,routyKey,true,properties,msg.getBytes());
            }


        }

    }
}
代码语言:javascript
复制
在如上中,我们针对发送的任务依据编号进行了优先级的设置。
3.2、消费者代码
代码语言:javascript
复制
package com.example.rabbitmq.priority;

import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ConsumerPriority
{
    private static final String exchangeName = "test_priority_exchange";
    private  static  final String queueName="test_priority_queue";
    private  static  final  String routingKey="priority.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();

            //设置优先级
            Map<String,Object>  arguments =new HashMap<String,Object>();
            arguments.put("x-max-prioroty",10);

            channel.exchangeDeclare(exchangeName,"topic",true,false,null);
            channel.queueDeclare(queueName,true,false,false,arguments);
            channel.queueBind(queueName,exchangeName,routingKey);

            channel.basicConsume(queueName,true,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
3.3、MyConsumer类代码
代码语言:javascript
复制
package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("---------------consumer---------------\n");
        System.err.println("the message received:"+new String(body));
        System.err.println("message priority:"+properties.getPriority());

    }
}
3.4、执行结果信息

如上的代码执行后,当然当前的资源不存在紧张的情况,那么就会按正常的顺序消费,具体输出结果如下:

如上,主要总结了消息队列优先级这部分的总结和它的案例应用。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-02-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、什么是优先级队列
  • 二、优先级的实现机制
  • 三、优先级队列实战代码
    • 3.1、生产者代码
      • 3.2、消费者代码
        • 3.3、MyConsumer类代码
          • 3.4、执行结果信息
          相关产品与服务
          消息队列 CMQ 版
          消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档