顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
n*2+1/min
),直到消费方正确响应,超出推送上限次数后标记为异常状态,可进行恢复!延迟队列多用于需要延迟工作的场景。
最常见的是以下两种场景:
比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。
扫表存在的问题是
实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。
这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做OrderMessage
),订单消息需要延迟5到15秒后进行异步处理。
选用了基于Redis
的有序集合Sorted Set
和Crontab
短轮询进行实现。
Sorted Set
的member
和score
添加到订单队列Sorted Set
中。JSON
字符串分别作为field
和value
添加到订单队列内容Hash
中。Lua
脚本保证原子性。Sorted Set
的命令ZREVRANGEBYSCORE
弹出指定数量的订单ID
对应的订单队列内容Hash
中的订单推送内容数据进行处理。处理方案一
弹出订单内容数据的同时进行数据删除,也就是ZREVRANGEBYSCORE
、ZREM
和HDEL
命令要在同一个Lua
脚本中执行,这样的话Lua
脚本的编写难度大,并且由于弹出数据已经在Redis
中删除,如果数据处理失败则可能需要从数据库重新查询补偿。
处理方案二
弹出订单内容数据之后,在数据处理完成的时候再主动删除订单队列Sorted Set
和订单队列内容Hash
中对应的数据,这样的话需要控制并发,有重复执行的可能性。
选用了方案一,也就是从
Sorted Set
弹出订单ID并且从Hash中获取完推送数据之后马上删除这两个集合中对应的数据。
方案的流程图大概是这样:
ZADD
命令 - 将一个或多个成员元素及其分数值加入到有序集当中。
ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN
ZREVRANGEBYSCORE
命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
MySQL
的LIMIT offset,size
一致,如果不指定此参数则返回整个集合的数据。[success]
ZREM
命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。
ZREM key member [member ...]
HMSET
命令 - 同时将多个field-value(字段-值)对设置到哈希表中。
HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN
HDEL
命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。
HDEL KEY_NAME FIELD1.. FIELDN
Lua
脚本并且返回脚本的SHA-1
字符串:SCRIPT LOAD script
。Lua
脚本:EVALSHA sha1 numkeys key [key ...] arg [arg ...]
。unpack
函数可以把table
类型的参数转化为可变参数,不过需要注意的是unpack
函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见Stackoverflow
的提问table.unpack() only returns the first element。如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。
enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
return nil
将任务的执行时间作为score,要执行的任务数据作为value,存放在zset中
dequeue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
if type == 'zset' then
local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
if list ~= nil and #list > 0 then
-- unpack函数能把table转化为可变参数
redis.call('ZREM', zset_key, unpack(list))
local result = redis.call('HMGET', hash_key, unpack(list))
redis.call('HDEL', hash_key, unpack(list))
return result
end
end
end
return nil
如果最小的分数小于等于当前时间戳,就将该任务取出来执行,否则休眠一段时间后再查询。
注意:这里其实有一个性能隐患,命令
ZREVRANGEBYSCORE
的时间复杂度可以视为为O(N),N是集合的元素个数,由于这里把所有的订单信息都放进了同一个Sorted Set(ORDER_QUEUE)中,所以在一直有新增数据的时候,dequeue
脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案
这里的出队使用Crontab
作为轮训去查询消费
<?php
/**
* @desc Redis 延迟任务队列
* @author Tinywan(ShaoBo Wan)
* @date 2024/05/02 11:36
*/
declare(strict_types=1);
namespace redis;
class RedisDelayQueue
{
// 生产者 脚本sha值
const DELAY_QUEUE_PRODUCER_SCRIPT_SHA = 'DELAY:QUEUE:PRODUCER:SCRIPT:SHA';
// 消费者 脚本sha值
const DELAY_QUEUE_CONSUMER_SCRIPT_SHA = 'DELAY:QUEUE:CONSUMER:SCRIPT:SHA';
// 订单关闭
const DELAY_QUEUE_ORDER_CLOSE = 'DELAY:QUEUE:ORDER:CLOSE';
// 订单关闭详情哈希
const DELAY_QUEUE_ORDER_CLOSE_HASH = 'DELAY:QUEUE:ORDER:CLOSE:HASH';
/**
* Redis 静态实例
* @return \Redis
*/
private static function _redis()
{
$redis = \redis\BaseRedis::server();
$redis->select(3);
return $redis;
}
/**
* @desc: 延迟队列 生产者
* @param string $keys1
* @param string $keys2
* @param string $member
* @param int $score
* @param array $message
* @return mixed
*/
public static function producer(string $keys1, string $keys2, string $member, int $score, array $message)
{
$redis = self::_redis();
$scriptSha = $redis->get(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA);
if (!$scriptSha) {
$script = <<<luascript
redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2])
redis.call('HSET', KEYS[2], ARGV[2], ARGV[3])
return 1
luascript;
$scriptSha = $redis->script('load', $script);
$redis->set(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA, $scriptSha);
}
$hashValue = json_encode($message, JSON_UNESCAPED_UNICODE);
return $redis->evalSha($scriptSha, [$keys1, $keys2, $score, $member, $hashValue], 2);
}
/**
* @desc: 延迟队列 消费者
* @param string $keys1
* @param string $keys2
* @param int $maxScore
* @return mixed
*/
public static function consumer(string $keys1, string $keys2, int $maxScore)
{
$redis = self::_redis();
$scriptSha = $redis->get(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA);
if (!$scriptSha) {
$script = <<<luascript
local status, type = next(redis.call('TYPE', KEYS[1]))
if status ~= nil and status == 'ok' then
if type == 'zset' then
local list = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', ARGV[3], ARGV[4])
if list ~= nil and #list > 0 then
redis.call('ZREM', KEYS[1], unpack(list))
local result = redis.call('HMGET', KEYS[2], unpack(list))
redis.call('HDEL', KEYS[2], unpack(list))
return result
end
end
end
luascript;
$scriptSha = $redis->script('load', $script);
$redis->set(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA, $scriptSha);
}
return $redis->evalSha($scriptSha, [$keys1, $keys2, $maxScore, 0, 0, 10], 2);
}
}
用redis来实现可以依赖于redis自身的持久化来实现持久化,redis的集群来支持高并发和高可用。因此开发成本很小,可以做到很实时。
private function delayQueueOrderClose()
{
$orderId = time();
$keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
$keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
$score = time() + 60; // 延迟60秒执行
$message = [
'event' => RedisDelayQueue::EVENT_ORDER_CLOSE,
'order_id' => $orderId,
'create_time' => time()
];
$res = RedisDelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
var_dump($res);
}
如果是ThinkPHP6 框架,执行该命令则可以生产消息,
php think crontab delay-queue-order-producer
循环
private function delayOrderProducer()
{
$keys1 = DelayQueue::KEY_ORDER_CLOSE;
$keys2 = DelayQueue::KEY_ORDER_CLOSE_HASH;
for ($i = 1; $i <= 10; $i++) {
$orderId = 'S' . $i;
$score = time(); // 延迟60秒执行
$message = [
'event' => DelayQueue::EVENT_ORDER_CLOSE,
'order_id' => $orderId,
'create_time' => time()
];
$res = DelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
var_dump($res);
}
}
1、通过Crontab 轮询执行
private function delayQueueOrderConsumer()
{
$keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
$keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
$maxScore = time();
$queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
if (false === $queueList) {
echo ' [x] Message List is Empty, Try Again ', "\n";
return;
}
var_dump($queueList);
}
说明:如果最小的分数小于等于当前时间戳,就将该任务取出来执行,否则休眠一段时间后再查询
2、阻塞执行
private function delayQueueOrderConsumerWhile()
{
$keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
$keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
while (true) {
$maxScore = time();
$queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
if (false === $queueList) {
echo ' [x] Message List is Empty, Try Again ', "\n";
sleep(1);
continue;
}
// 处理业务
foreach ($queueList as $queue) {
$messageArray = json_decode($queue, true);
}
}
}
方案一:弹出订单内容数据的同时进行数据删除,也就是
ZREVRANGEBYSCORE
、ZREM
和HDEL
命令要在同一个Lua
脚本中执行,这样的话Lua
脚本的编写难度大,并且由于弹出数据已经在Redis中删除,如果数据处理失败则可能需要从数据库重新查询补偿。针对以上的解决方案就是:消息进入到延迟队列后,保证至少被消费一次。
ACK(将消息标记为"已处理")
ACK(将消息标记为"已处理")