【RabbitMq 篇五】-要点概念(优先级、顺序性、消息分发、持久化)

前言

本文介绍RabbitMq几个重要的概念。分别是优先级队列、消息顺序性、消息分发、持久化

正文

目录

前言

正文

优先级队列

消息顺序性

消息分发

持久化


优先级队列

顾名思义,优先级高的具备优先消费的特权

设置方式是在声明队列的时候设置参数:x-max-priority,代表最大优先级,如果参数设置10,如代码所示:

@Bean
    public Queue priorityQueue() {
        Map<String, Object> map = new HashMap<>();
        //给当前队列配置最大优先级
        map.put("x-max-priority", 10);
        return new Queue(PRIORITY_QUEUE, true, false, false, map);
    }

那么则表示在该队列存在11个级别,从高到低是 10,9,……0。

并且需要在发送消息的时候设置当前消息的优先级,如代码所示:

amqpTemplate.convertAndSend("priority_exchange", "", order, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setPriority(finalI);
                    return message;
                }
            });

需要注意的是,需要在生产者的速度大于消费者的速度,且Broker中有消息堆积的情况下,设置队列及消息优先级才有意义,否则,生产者刚生产一条就被消费者消费了,消息在Broker没有堆积,何谈优先级。

消息顺序性

RabbitMq没有属性设置消息的顺序性,所以在没有前提的情况下说RabbitMq消息的消费具有顺序性是错误的,理想情况下,没有重复消费前提下,生产者发送 msg1、msg2、msg3,那么消费者消费顺序也是msg1、msg2、msg3。

但是这种情况毕竟是理想的,而这种理想情况在实际中很容易会被打破,例如消息丢失,网络原因,异常发生,而且也是在一个生产者和一个消费这的情况,如果多个生产者的话,真的就无法保证哪个消息先到达Broker,也就不能保证顺序。

例举一下情况,打破消费的顺序性。

生产者使用了事务,且触发了回滚,重新补发消息后,顺序可能是错乱的。

开启publisher confirm后出现超时、中断、拒绝、nack命令等,重新补发消息后,顺序可能是错乱的。

消息分发

RocketMq有多个消费者的时候,队列会以轮询的方式分发给多个消费者。

这里有一个很重要的参数 channel.basicQos(),该方法是允许信道上消费者最大未确认消息数量。他是针对信道而言的,一个连接可以有多个信道,一个信道可以有多个队列。

channel.basicQos()参数只适用于推模式的消费方式。

举个例子,channel.basicQos(5),代表该信道上的其中一个消费者未确认数量达到5后,RabbitMq就不会向这个消费者在发送任何消息,直到该消费者确认了一个消息后计数器减1,之后才可以继续接收消息。

该参数有3个重载方法:

void basicQos(int var1, int var2, boolean var3) throws IOException;

 void basicQos(int var1, boolean var2) throws IOException;

 void basicQos(int var1) throws IOException;

我们上面说的那个数字就是该方法的第一个参数 var1,在他的实现类 AutorecoveringChannel 里参数名叫 prefetchCount

如果使用 basicQos(int var1),var1代表消费者所能接收未确认消息总数,写0 代表没有上限。

如果使用 basicQos(int var1, int var2, boolean var3),在他的实现类里实现如下,仔细看他的参数顺序。

public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException {
        if (global) {
            this.prefetchCountGlobal = prefetchCount;
        } else {
            this.prefetchCountConsumer = prefetchCount;
        }

        this.delegate.basicQos(prefetchSize, prefetchCount, global);
    }

prefetchSize参数是消费者所能接收未确认消息的总大小,单位是B,同样,设置0表示无上限。

global参数含义如下

global

Meaning of prefetch_count in AMQP 0-9-1

Meaning of prefetch_count in RabbitMQ

false

信道上的消息都要遵循prefetchCount值

信道上新的消费者需要遵循prefetchCount值

true

当前连接上所有消费者都要遵循prefetchCount值

信道上的消息都要遵循prefetchCount值

那么我们怎么使用呢?

channel.basicQos(10);

channel.basicConsume("my-queue1",false,consumer1);

channel.basicConsume("my-queue2",false,consumer2);

这样写是说my-queue1和my-queue2两个队列里最大的未确认消息总是分别都是10.

再来一种复杂的写法

channel.basicQos(3,false);
channel.basicQos(5,true);
channel.basicConsume("my-queue1",false,consumer1);

channel.basicConsume("my-queue2",false,consumer2);

含义是:每个消费者最多接收3个未确认的消息,当前这个信道最大可接收5个未确认的消息。

设置为true 指的是同一个连接范围内所有信道上未确认数量之和。

设置为false指的是每个信道上每个消费者最大未确认数量。

RabbitMq计算这些数量也是需要占用性能的,所以我们一般指定的false,默认也是false。

关于消息分发的概念理解起来还是比较复杂的,该功能也是在特殊需要时才会使用,一般的我们没必要设置这个参数,而且目前我没有找到Spring集成的模版中直接可以操作这个参数,所以只能用原始的方式在channel 声明。

持久化

持久化概念是一个比较重要也是比较好理解的,而且我们在使用的时候也会经常碰到。

首先,持久化的意思跟其他中间件的持久化概念基本相同,都是保证数据的可靠性,防止丢失的操作。

在RabbitMq中存在三个部分的持久化,分别是:交换器持久化、队列持久化、消息持久化。

无论是那个阶段的持久化,参数都是 durable ,设置 true代表持久化,false不持久化

交换器持久化

在使用的时候我们无需指定,因为构造方法里面默认给了true

 @Bean
    FanoutExchange goodsChangeExchange() {
        return new FanoutExchange("GOODS");// 配置广播路由器
    }

 public AbstractExchange(String name) {
        this(name, true, false);
    }

队列持久化

队列和消息的持久化,如果只设置其中一个是没有意义的,因为消息存在队列里,如果消息设置持久化,队列没有,那么队列丢失,消息也会丢失,有句话说,毛长在皮上,那皮都没有了毛还能有吗。反过来,设置了队列持久化同样也不能保证消息不能丢失。因为,两者必须同时存在才有意义,当然设置了持久化后也会消耗性能的。

这是Queue.class 部分代码,我们不设置持久化,默认是持久化的,所以我们不用指定也是可以的。

public Queue(String name) {
        this(name, true, false, false);
    }

消息持久化

消息持久化有两种写法,分别对应的枚举类的两个方法。

amqpTemplate.convertAndSend("priority_exchange", "", order, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setPriority(finalI);
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.fromInt(2));
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                }
            });




-----------------------------------------------


public enum MessageDeliveryMode {
    NON_PERSISTENT,
    PERSISTENT;

    private MessageDeliveryMode() {
    }

    public static int toInt(MessageDeliveryMode mode) {
        switch(mode) {
        case NON_PERSISTENT:
            return 1;
        case PERSISTENT:
            return 2;
        default:
            return -1;
        }
    }

    public static MessageDeliveryMode fromInt(int modeAsNumber) {
        switch(modeAsNumber) {
        case 1:
            return NON_PERSISTENT;
        case 2:
            return PERSISTENT;
        default:
            return null;
        }
    }
}

同样,我们也不需要设置消息的持久化,因为默认就是持久化的,大家自行搜索这个类 MessageProperties.class

我截取了部分代码。

public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE;

 public MessageProperties() {
        this.deliveryMode = DEFAULT_DELIVERY_MODE;
        this.priority = DEFAULT_PRIORITY;
    }


 static {
        DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
        DEFAULT_PRIORITY = Integer.valueOf(0);
    }

以上我们把交换器、队列、消息都设置了持久化,那么就能保证消息百分之百不丢失吗?答案是否定的。

在这里透露一下,下一节我们就会介绍比持久化可靠性更高的方式来保证消息不被丢失。

可以提示一下,一种是事务,一种是生产者确认方式。

想看更多内容请关注!

代码地址:https://github.com/362460453/rabbitMQ-demo

本文分享自微信公众号 - 晏霖(yanlin199507)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-06-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏老男孩成长之路

RabbitMQ如何通过持久化保证消息99.99%不丢失?

要解决该问题,就要用到RabbitMQ中持久化的概念,所谓持久化,就是RabbitMQ会将内存中的数据(Exchange 交换器,Queue 队列,Messag...

9510
来自专栏老男孩成长之路

RabbitMQ如何保证队列里的消息99.99%被消费?

其实,还有1种场景需要考虑:当消费者接收到消息后,还没处理完业务逻辑,消费者挂掉了,那消息也算丢失了?,比如用户下单,订单中心发送了1个消息到RabbitMQ里...

16150
来自专栏慕容千语的架构笔记

Java消息队列总结只需一篇ActiveMQ、RabbitMQ、ZeroMQ、Kafka

消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有Acti...

13620
来自专栏Java架构师进阶

高并发场景下,如何保证生产者投递到消息中间件的消息不丢失?

如果投递出去的消息在网络传输过程中丢失,或者在RabbitMQ的内存中还没写入磁盘的时候宕机,都会导致生产端投递到MQ的数据丢失。

12220
来自专栏跟着阿笨一起玩NET

C#基于RabbitMQ实现客户端之间消息通讯实战演练

21230
来自专栏以Java架构赢天下

RabbitMQ高级特性消费端限流策略实现

举一些我们平常生活中的消费场景,例如:火车票、机票、门票等,通常来说这些服务在下单之后,后续的出票结果都是异步通知的,如果服务本身只支持每秒1000访问量,由于...

9230
来自专栏程序员小明

消息中间件(一):3种主流的MQ对比

消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:

53820
来自专栏Java研发军团

自己工作环境没机会接触高并发、分布式怎么办?

面试总会遇到一些关系高并发、分布式的问题,可是自己工作中不接触,自学又不深入,这可怎么办?

17030
来自专栏老男孩成长之路

RabbitMQ如何保证消息99.99%被发送成功?

要想保证消息不丢失,首先我们得保证生产者能成功的将消息发送到RabbitMQ服务器。

17730
来自专栏菲宇

Django中Celery的实现介绍(一)

Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

45720

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励