专栏首页shysh95RabbitMQ进阶使用

RabbitMQ进阶使用

本文主要讲解以下几点:

  1. 备份交换器
  2. 过期时间
  3. 私信队列
  4. 延迟队列
  5. 优先级队列
  6. RPC
  7. 持久化
  8. 生产者确认机制
  9. 消费者分发和传输保障

备份交换器

在入门使用曾提到过,生产者发送消息可以使用mandatory参数,该参数的作用主要是在交换器根据路由键无法匹配队列的时候讲消息返回给生产者,但是需要生产者通过ReturnListener来进行处理,这无疑增加了生产者客户端程序编写的复杂度。

为了避免上述问题,可以使用备份交换器(Alternate Exchange),或者称之为备胎交换器。

该交换器的作用主要是在消息无法被路由到队列时,可以路由到该交换器,再路由到备份交换器所绑定的队列(这里路由到队列的路由键与生产者发出的路由键是一样的)。

在进行交换器声明(exchangeDeclare)的时候,通过指定alternate-exchange参数来实现。备份交换器建议类型为fanout,这样可以避免再次路由还是无法找到匹配队列的问题。

总结一下备份交换器的情况:

  • 如果设置的备份交换器不存在,客户端和RabbitMQ服务无异常,消息丢失
  • 如果备份交换器无绑定队列,客户端和RabbitMQ服务无异常,消息丢失
  • 如果备份交换器无匹配的队列,客户端和RabbitMQ服务无异常,消息丢失
  • mandatory和备份交换器一起使用,mandatory参数无效

过期时间(TTL)

RabbitMQ可以对队列和消息进行过期时间的设置。

消息TTL

TTL不设置,表示消息永久有效,设置为0,表示消息此时必须立即能投递给消费者,否则直接抛弃。

消息TTL的设置有两种方式:

  • 通过队列属性设置,该队列所有的消息有相同的过期时间
  • 通过消息属性设置,每个消息的过期时间都不相同

两种方法一起使用,过期时间TTL短的生效。消息在过期之后,就会成为死信,消费者将可能无法收到该消息(请注意此处的可能,为什么是可能,往后看就知道了)

通过队列属性设置消息的过期时间如下,依赖x-message-ttl参数的设置,单位为毫秒:

Map<String, Object> params = new HashMap<>(1);//设置消息过期时间为6秒params.put("x-message-ttl", "6000");channel.queueDeclare("order", true, false, false, params);

通过消息属性设置消息的过期时间如下,依赖expiration参数,单位为毫秒:

channel.basicPublish("order", "food.order",        new AMQP.BasicProperties.Builder()                .expiration("3000")                .build(),        "This is computer order".getBytes(Charset.forName("UTF-8")));

通过队列属性和消息属性设置的过期消息的处理方式有所不同:

  • 队列属性:消息一旦过期,RabbitMQ服务会直接删除消息
  • 消息属性:消息过期时,并不直接删除消息;当消息进行投递时才会进行过期时间的判断

上述处理方式的原因主要是:

  • 通过队列属性设置的消息过期时间均一致,此时过期的消息都在队列的头部,RabbitMQ只需要定期从队列头部扫描过期消息并删除。
  • 通过消息属性设置的消息过期时间存在很大差别,如果要删除消息将会扫描全部队列,从而降低RabbitMQ服务性能,因此选择在投递时进行判断,如果过期删除,不过期投递。

队列TTL

这里设置的过期时间指的是:在给定的TTL时间内,如果队列未使用,则将队列进行删除。通过在声明队列时指定x-expires参数来设置,单位毫秒。 这里的未使用主要指:

  • 队列上没有任何的消费者
  • 队列也没有被重新声明
  • 在过期时间内未调用过basicGet方法

RabbitMQ重启后,持久化的队列的过期时间会被重新计算。

死信队列

了解什么是死信队列之前,需要知道死信交换器(DLX, Dead-Letter-Exchange)。当消息在一个队列中变成死信时,该消息可能会被重发到另一个交换器,这个交换器就是所谓的死信交换器(DLX)。绑定死信交换器的队列则成为死信队列。

消息成为死信的原因:

  • 消息被拒绝(basicReject/basicNack),并且禁止重新入队(requeue参数设置为false)
  • 消息过期
  • 队列达到最大长度

给一个队列添加死信交换器主要依靠x-dead-letter-exchange参数,消息发送给死信交换器时的路由键可以通过x-dead-letter-routing-key来设置,如果不设置则使用原消息的路由键。

延迟队列

延迟消息是指生产者发送消息给RabbitMQ服务之后,并不想让消费者立刻消费消息,而是等待特定时间后再进行消费。

首先先确认一个点,AMQP协议和RabbitMQ是不支持延迟队列的。所谓的延迟队列时通过死信交换器(DLX)和TTL来实现的。

死信队列可以被看成延迟队列。

优先级队列

优先级高的队列具有优先权,优先级高的消息具备优先被消费的特权。

设置队列的优先级可以在声明队列时指定x-max-priority参数值来设置,消息优先级的设置最大不能超过队列的优先级,通过priority属性来设置。

优先级高的消息优先被消费是在消费者的消费能力远远小于生产者的生产能力产生消息堆积的情况下才有意义,如果没有消息堆积那么消息优先级毫无意义。

RPC

RPC的实现官网有DEMO,去官网看。 这里主要讲一下原理:利用RabbitMQ实现RPC主要依靠replyTo属性和correlationId的值

  1. replyTo:该参数主要用来设置回调队列
  2. correlationId:用来关联request和RPC调用后的response

使用correlationId该属性的原因是避免一次RPC请求就要创建一个回调队列,可以通过多个RPC复用一个回调队列,使用correlationId来获取request对应的response降低RabbitMQ的性能损耗。

下面是RPC的主要流程:

  1. 客户端向RabbitMQ一个队列(例如rpc)中发送消息,并且创建一个自定义的队列(amq.gen-G6gTPol66waTRPHPQjPKAA),将自定义队列的名称写入replyTo属性,然后生成这次请求的唯一ID,设置到correlationId属性中
  2. 发送消息到rpc队列
  3. 服务端订阅rpc队列,消费消息,完成处理逻辑之后,将结果发送至会调队列,并且将correlationId设置为请求的correlationId
  4. 客户端订阅自定义队列(amq.gen-G6gTPol66waTRPHPQjPKAA),等待并且消费结果,在消费时需要判断correlationId是否和响应中的correlationId一致,如果一致说明该响应就是本次请求的结果

持久化

  • 交换器持久化:开启交换器持久化,会自动保存交换器元数据信息落地磁盘,在RabbitMQ宕机重启时自动恢复交换器
  • 队列持久化:开启队列持久化,会自动保存队列的元数据信息落地磁盘,在RabbitMQ宕机重启时自动恢复队列
  • 消息持久化:开启消息持久化,自动保存消息内容落地磁盘,在RabbitMQ宕机重启时未被消费的信息会重新加载到队列中

总结一下:要想做到消息不丢失,必须开启消息持久化和队列持久化,交换器持久化可以不开启,只不过如果不开启交换器持久化,RabbitMQ宕机重启后无法再往该交换器发送消息

上面的方案并不可能完全保证消息不丢失,假设在消息写入队列却未落地磁盘之前,RabbitMQ宕机,此时还未落地磁盘的消息将会丢失。至于这种问题,需要依靠镜像队列来解决,后面讲述。

但是镜像队列也并不能完全保证消息的不丢失,只是降低丢失的概率,增强RabbitMQ的可靠性。

生产者确认机制

上述的持久化和之前的手动确认消息机制只是尽可能的保证消息在到达队列之后不丢失,但是却无法保证消息在发送至队列的途中丢失。为了解决上述问题,主要有以下两种解决方式:

  • 事务机制:不推荐使用,事务会严重降低RabbitMQ的性能
  • 发送方确认机制(publisher confirm)

事务机制

由于事务机制不推荐使用,这里就简单描述,不再深究。

  • channel.txSelect:用于将当前的信道置为事务模式
  • channel.txCommit:用于提交事务
  • channel.txRollback:用于事务回滚

发送方确认机制

主要有以下三种方式:

  • 普通确认机制:同步阻塞
  • 批量确认机制:降低同步阻塞范围
  • 异步确认机制:无阻塞,但编码稍微复杂,推荐使用该方式

普通确认和批量确认

Confirm.SelectOk confirmSelect() throws IOException;

上面的方法用来将信道置为发送方确认模式。如果信道没有开启publisher confirm模式,则调用任何waitForConfirms方法都会报出java.lang.IllegalStateException。

boolean waitForConfirms() throws InterruptedException;boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

上面的方法返回条件是客户端收到了对应的Basic.Ack/.Nack消息,timeout表示超时时间。

void waitForConfirmsOrDie() throws IOException, InterruptedException;void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

上面的方法接收到RabbitMQ返回的Basic.Nack之后会抛出java.io.IOException,timeout表示超时时间。

批量确认的逻辑无非就是批量发送一批消息后(例如10条),然后调用一下waitForConfirms进行批量确认,这虽然一定程度上降低普通确认的阻塞,但会导致部分消息的重复发送。

异步确认

Channel中提供addConfirmListener(ConfirmListener listener)方法来添加确认回调接口。

public interface ConfirmListener {    void handleAck(long deliveryTag, boolean multiple)        throws IOException;
    void handleNack(long deliveryTag, boolean multiple)        throws IOException;}

该回调接口中主要包含两个方法:

  • handleAck:用来处理Basic.Ack的命令
  • handleNack:用来处理Basic.Nack的命令

上面两个方法中都包含消息的唯一标志deliveryTag。我们需要为每信道维护一个"unConfirm"的消息序号集合,每发送一条消息,集合元素加1。每当调用一次 handleAck,未确认集合中删掉一条(multiple为false)或者多条(multiple为true)数据。unConfirm消息序号集合推荐使用有序集合SortedSet的存储结构。

消息分发和传输保障

消息分发

队列在有多个消费者,将会采用轮询的方式来分发消息,但这会导致性能差、处理消息速度慢的消费者堆积大量未处理消息,而性能好、处理消息速度快的消费者则处于空闲状态,严重时将会压垮性能差的消费者。为了避免上述情况,channel.basicQos方法允许限制信道上的消费者所能保持的最大未确认消息的数量。

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;void basicQos(int prefetchCount, boolean global) throws IOException;void basicQos(int prefetchCount) throws IOException;
  • prefetchSize:消费者所能接收未确认消息的总体大小的上限,单位为B
  • prefetchCount:消费者所能保持的最大未确认消息的数量
  • global:设置为true,指同一个新道上所有的消费者共同遵从最大未确认消息的数量,设置为false,指的是信道上的消费者单独遵守最大未确认消息的数量

举个例子对global进行说明:假如同一个信道上有consumerA,consumerB,现在prefetchCount被设置为了5,队列中有10条消息

  • global为true:假如consumerA有3条待确认消息,那么B最多只能接受2条消息
  • global为false:consumerA和consumerB都可以最多有5条待确认消息

还有下面这种情况:

channel.basicQos(3, false); channel.basicQos(5, true);

这种情况下两条语句都生效,consumerA和consumerB两个加起来的待确认消息不能超过5个,consumerA和consumerB两个消费者待确认的消息分别不能超过3个。

消息顺序

RabbitMQ暂时无法保证消息的顺序性,只能从业务着手,比如在消息体能添加全局的有序标识。

消息传输保障

一般消息中间件的消息 传输保障分为三个层级:

  • At most once:最多一次。消息可能会丢失,但绝不会重复传输
  • At least once:最少一次。消息绝不会丢失,但可能会重复传输。
  • Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次。

最多一次随便发随便消费。

恰好一次目前主流的消息中间件都无法保证,业务只能在使用的时候自己做去重处理,这个后续讲述。

最少一次则可以利用我们之前的知识进行保证:

  1. 生产者使用发送方确认机制,保证消息发送不丢失
  2. 生产者需要借助mandatory参数和备份交换器保证消息能够正确路由到队列
  3. 消息和队列都要开启持久化,保证RabbitMQ重启未消费信息不丢失
  4. 消费者消费消息使用手动确认机制

本文分享自微信公众号 - shysh95(shysh95),作者:shysh95

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-04-03

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RabbitMQ基础使用

    生产消息的应用,生产者需要指定将消息发送到哪个exchange,并且指定routingkey(这是为了exchange可以将消息路由到相关的队列)。

    shysh95
  • RabbitMQ Federation

    在文章开始之前,我们先介绍一下联邦机制的基本概念。联邦机制的实现,依赖于RabbitMQ的Federation插件,该插件的主要目标是为了RabbitMQ可以在...

    shysh95
  • RabbitMQ Shovel

    Shovel能够可靠、持续地从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker中的交换器(作为目的端,即destinati...

    shysh95
  • RabbitMQ高级面试题

    在生产者投递消息时指定mandatory或者imrnediate参数设为 true 时,RabbitMQ 会把无法投递的消息通过Basic.Return 命令将...

    Java学习录
  • RabbitMQ快速入门

    最近一段项目实践中大量使用了基于RabbitMQ的消息中间件,也积累的一些经验和思考,特此成文,望大家不吝赐教。 本文包括RabbitMQ基本概念、进阶概念、...

    用户1216676
  • RabbitMQ实战:理解消息通信

    前段时间总结完了「深入浅出MyBatis」系列,对MyBatis有了更全面和深入的了解,在掘金社区也收到了一些博友的喜欢,很高兴。另外,短暂的陪产假就要结束了,...

    情情说
  • springboot + 消息队列

    第一种:用户注册信息写入数据库后在按照顺序先后发送注册邮件和短信,走完这三步后用户才完成注册

    桑鱼
  • 全网最全RabbitMQ总结,别再说你不会RabbitMQ

    当初我学RabbitMQ的时候,第一时间就上GitHub找相应的教程,但是令我很失望的是没有找到,Spring,Mybatis之类的教程很多,而RabbitMQ...

    Java识堂
  • RabbitMQ要点

    发送方确认模式:将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。一旦消息被投递到目的队列后,或者消息被写入磁...

    技术从心
  • RabbitMQ 面试要点

    原文链接:https://chaser520.iteye.com/blog/2428253

    业余草

扫码关注云+社区

领取腾讯云代金券