前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MQ教程 | 基于RabbitMQ消息延时队列

MQ教程 | 基于RabbitMQ消息延时队列

作者头像
Tinywan
发布2020-04-01 16:48:55
3.7K1
发布2020-04-01 16:48:55
举报
文章被收录于专栏:开源技术小栈

▍延迟任务应用场景

  • 物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。
  • 订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。
  • 过1分钟给新注册会员的用户,发送注册邮件等。

▍RabbitMQ延迟队列实现的方式有两种

  • 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能。
  • 使用rabbitmq-delayed-message-exchange 插件实现延迟功能(注意:延迟插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的)

▍死信机制实现延迟队列

RabbitMQ没有直接去实现延迟队列这个功能。而是需要通过消息的TTL(Time To Live))和死信交换机(Dead Letter Exchanges)这两者的组合来实现。

▍消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。

超过了这个时间,认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。

可以通过设置消息的 expiration 字段或者队列 x-message-ttl 属性来设置时间,两者是一样的效果。

下面例子是通过设置消息的 expiration 字段实现死信,针对每条消息设置 TTL 是在发送消息的时候设置 expiration 参数,单位为毫秒

代码语言:javascript
复制
$body = 'Tinywan expiration!';$msg = new AMQPMessage($body);$msg->set("delivery_mode", AMQPMessage::DELIVERY_MODE_PERSISTENT); // 设置超时时间$msg->set("expiration", 30000); // ms 1000ms = 1s

上面的代码在向队列发送消息的时候,通过传递 { expiration: '30000'} 将这条消息的过期时间设为了 30秒,对消息设置 30秒 钟过期,这条消息并不一定就会在30秒钟后被丢弃或进入死信,只有当这条消息到达队首即将被消费时才会判断其是否过期,若未过期就会被消费者消费,若已过期就会被删除或者成为死信。

▍死信交换器(Dead Letter Exchanges)

RabbitMQ中有一种交换器叫 死信交换器,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它会被重新发送到另外一个交换器中,这个交换器就是 死信交换器,绑定在 死信交换器 上的队列就称之为 死信队列

队列中的消息在以下三种情况下会变成死信:

  • 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
  • 上面的消息的TTL到了,消息过期了。
  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

当队列中的消息成为死信以后,如果队列设置了DLX那么消息会被发送到DLX。通过x-dead-letter-exchange设置DLX,通过这个x-dead-letter-routing-key设置消息发送到DLX所用的routing-key,如果不设置默认使用消息本身的routing-key。

Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

死信交换器可以在程序中设置,也可以使用rabbitmqctl工具进行设置,关于死信交换器的介绍请参考RabbitMQ官网 https://www.rabbitmq.com/dlx.html

▍死信队列设置

1. 首先需要设置死信队列的exchange和queue,然后进行绑定:

代码语言:javascript
复制
Exchange: dlx.exchangeQueue: dlx.queueRoutingKey: ##表示只要有消息到达了Exchange,那么都会路由到这个queue上

2. 然后需要有一个监听,去监听这个队列进行处理

3. 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列加上一个参数即可

代码语言:javascript
复制
arguments.put(" x-dead-letter-exchange","dlx.exchange");

这样消息在过期、requeue、 队列在达到最大长度时,消息就可以直接路由到死信队列!

▍定时任务

因为队列中的消息过期后会成为死信,而死信又会被发布到该消息所在的队列的 DLX 上去,所以通过为消息设置过期时间,然后再消费该消息所在队列的 DLX 所绑定的队列,从而来达到定时处理一个任务的目的。

简单的讲就是当有一个队列 queue1,其 DLX 为 deadEx1,deadEx1 绑定了一个队列 deadQueue1,当队列 queue1 中有一条消息因过期成为死信时,就会被发布到 deadEx1 中去,通过消费队列 deadQueue1 中的消息,也就相当于消费的是 queue1 中的因过期产生的死信消息。

▍参考案例

发送消息

代码语言:javascript
复制
public static function delayQueueSend($param = []){    $connection = RabbitMqConnection::getConnection();    $channel = $connection->channel();//定义等待exchange// Fanout:该类型路由规则非常简单,会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,相当于广播功能。    $channel->exchange_declare('waitSendExchange', 'fanout', false, false, false);//定义过期exchange    $channel->exchange_declare('expireExchange', 'fanout', false, false, false);//定义过期queue    $channel->queue_declare("expireQueue", false, false, false, false, false);//定义等待queue    $channel->queue_declare("waitSendQueue", false, false, false, false, false, new AMQPTable(["x-dead-letter-exchange" => "expireExchange"]));    $channel->queue_bind("waitSendQueue", "waitSendExchange");    $channel->queue_bind("expireQueue", "expireExchange");
    $body = 'Tinywan expiration!';    $msg = new AMQPMessage($body);    $msg->set("delivery_mode", AMQPMessage::DELIVERY_MODE_PERSISTENT); // 设置超时时间    $msg->set("expiration", 30000); // ms 1000ms = 1s
// 向等待exchage发布消息    $channel->basic_publish($msg, 'waitSendExchange');echo ' [x] Sent '. date('Y-m-d H:i:s') .': ', $body, "\n";    $channel->close();    $connection->close();}

接受消息(这里为阻塞模式)

代码语言:javascript
复制
public static function delayQueueReceive(){    $connection = RabbitMqConnection::getConnection();    $channel = $connection->channel();
//定义等待exchange    $channel->exchange_declare('waitSendExchange', 'fanout', false, false, false);//定义过期exchange    $channel->exchange_declare('expireExchange', 'fanout', false, false, false);
//定义过期queue    $channel->queue_declare("expireQueue", false, false, false, false, false);//定义等待queue    $channel->queue_declare("waitSendQueue", false, false, false, false, false, new AMQPTable(["x-dead-letter-exchange" => "expireExchange"]));    $channel->queue_bind("waitSendQueue", "waitSendExchange");    $channel->queue_bind("expireQueue", "expireExchange");
echo ' [*] Waiting for message. To exit press CTRL+C '.PHP_EOL;    $callback = function ($msg) {echo ' [x] Receive '. date('Y-m-d H:i:s') .':', $msg->body, "\n";    };// 订阅超时queue    $channel->basic_consume("expireQueue", "", false, true, false, false, $callback);
while (count($channel->callbacks)) {      $channel->wait();    }    $channel->close();    $connection->close();}

注意:如果声明的 expireExchange 是 direct 类型,那么在为其绑定队列的时候一定要指定 BindingKey,即这里的 deadLetterRoutingKey,如果不指定 Bindingkey,则需要将 expireExchange 声明为 fanout 类型。

fanout 类型路由规则非常简单,会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,相当于广播功能)。

▍运行结果

发送消息

接受消息

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-03-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Tinywan的杂货摊 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档