前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis 如何实现延时任务队列

Redis 如何实现延时任务队列

作者头像
Tinywan
发布2024-05-11 17:05:59
1210
发布2024-05-11 17:05:59
举报
文章被收录于专栏:开源技术小栈开源技术小栈

简介

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。

延时任务和定时任务区别

  • 延时任务有别于定时任务,定时任务往往是固定周期的,有明确的触发时间。
  • 而延时任务一般没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件。
  • 任务事件生成时并不想让消费者立即拿到,而是延迟一定时间后才接收到该事件进行消费。

业务场景

  • 订单超时,用户下单后进入支付页面(通常会有超时限制)超过15分钟没有进行操作,那么这个订单就需要作废处理。
  • 如何定期检查处于退款状态的订单是否已经退款成功?
  • 注册后到现在已经一周的用户,如何发短信撩动。
  • 交易信息双重效验防止因系统级/应用级/用户级等各种异常情况发生后导致的全部/部分丢失的订单信息。
  • 实现重复通知,默认失败连续通知10次(通知间隔为n*2+1/min),直到消费方正确响应,超出推送上限次数后标记为异常状态,可进行恢复!

使用场景

延迟队列多用于需要延迟工作的场景。

最常见的是以下两种场景:

1、延迟消费
  1. 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
  2. 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。
2、延迟重试

比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。

扫表存在的问题是

  • 扫表与数据库长时间连接,在数量量大的情况容易出现连接异常中断,需要更多的异常处理,对程序健壮性要求高
  • 在数据量大的情况下延时较高,规定内处理不完,影响业务,虽然可以启动多个进程来处理,这样会带来额外的维护成本,不能从根本上解决。
  • 每个业务都要维护一个自己的扫表逻辑。当业务越来越多时,发现扫表部分的逻辑会重复开发,但是非常类似

缓存队列设计

场景设计

实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。

这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做OrderMessage),订单消息需要延迟5到15秒后进行异步处理。

延时队列的实现

选用了基于Redis的有序集合Sorted SetCrontab短轮询进行实现。

具体方案是:
  1. 订单创建的时候,订单ID和当前时间戳分别作为Sorted Setmemberscore添加到订单队列Sorted Set中。
  2. 订单创建的时候,订单ID和推送内容JSON字符串分别作为fieldvalue添加到订单队列内容Hash中。
  3. 第1步和第2步操作的时候用Lua脚本保证原子性。
  4. 使用一个异步线程通过Sorted Set的命令ZREVRANGEBYSCORE弹出指定数量的订单ID对应的订单队列内容Hash中的订单推送内容数据进行处理。
对于第4点处理有两种方案:

处理方案一

弹出订单内容数据的同时进行数据删除,也就是ZREVRANGEBYSCOREZREMHDEL命令要在同一个Lua脚本中执行,这样的话Lua脚本的编写难度大,并且由于弹出数据已经在Redis中删除,如果数据处理失败则可能需要从数据库重新查询补偿。

处理方案二

弹出订单内容数据之后,在数据处理完成的时候再主动删除订单队列Sorted Set和订单队列内容Hash中对应的数据,这样的话需要控制并发,有重复执行的可能性。

选用了方案一,也就是从Sorted Set弹出订单ID并且从Hash中获取完推送数据之后马上删除这两个集合中对应的数据。

方案的流程图大概是这样:

Redis相关命令

Sorted Set相关命令

ZADD命令 - 将一个或多个成员元素及其分数值加入到有序集当中。

代码语言:javascript
复制
ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN

ZREVRANGEBYSCORE命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。

代码语言:javascript
复制
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
  • max:分数区间 - 最大分数。
  • min:分数区间 - 最小分数。
  • WITHSCORES:可选参数,是否返回分数值,指定则会返回得分值。
  • LIMIT:可选参数,offset和count原理和MySQLLIMIT offset,size一致,如果不指定此参数则返回整个集合的数据。

[success] ZREM命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。

代码语言:javascript
复制
ZREM key member [member ...]
Hash相关命令

HMSET命令 - 同时将多个field-value(字段-值)对设置到哈希表中。

代码语言:javascript
复制
HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN

HDEL命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。

代码语言:javascript
复制
HDEL KEY_NAME FIELD1.. FIELDN
Lua 语法
  • 加载Lua脚本并且返回脚本的SHA-1字符串:SCRIPT LOAD script
  • 执行已经加载的Lua脚本:EVALSHA sha1 numkeys key [key ...] arg [arg ...]
  • unpack函数可以把table类型的参数转化为可变参数,不过需要注意的是unpack函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见Stackoverflow的提问table.unpack() only returns the first element。

如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。

Lua 脚本

入队 enqueue.lua
代码语言:javascript
复制
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
return nil

将任务的执行时间作为score,要执行的任务数据作为value,存放在zset中

出队 dequeue.lua
代码语言:javascript
复制
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
    if type == 'zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            -- unpack函数能把table转化为可变参数
            redis.call('ZREM', zset_key, unpack(list))
            local result = redis.call('HMGET', hash_key, unpack(list))
            redis.call('HDEL', hash_key, unpack(list))
            return result
        end
    end
end
return nil

如果最小的分数小于等于当前时间戳,就将该任务取出来执行,否则休眠一段时间后再查询。

注意:这里其实有一个性能隐患,命令ZREVRANGEBYSCORE的时间复杂度可以视为为O(N),N是集合的元素个数,由于这里把所有的订单信息都放进了同一个Sorted Set(ORDER_QUEUE)中,所以在一直有新增数据的时候,dequeue脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案

这里的出队使用Crontab 作为轮训去查询消费

业务核心代码

延迟队列类 RedisDelayQueue.php
代码语言:javascript
复制
<?php

/**
 * @desc Redis 延迟任务队列
 * @author Tinywan(ShaoBo Wan)
 * @date 2024/05/02 11:36
 */

declare(strict_types=1);

namespace redis;

class RedisDelayQueue
{
    // 生产者 脚本sha值
    const DELAY_QUEUE_PRODUCER_SCRIPT_SHA = 'DELAY:QUEUE:PRODUCER:SCRIPT:SHA';
    // 消费者 脚本sha值
    const DELAY_QUEUE_CONSUMER_SCRIPT_SHA = 'DELAY:QUEUE:CONSUMER:SCRIPT:SHA';

    // 订单关闭
    const DELAY_QUEUE_ORDER_CLOSE = 'DELAY:QUEUE:ORDER:CLOSE';
    // 订单关闭详情哈希
    const DELAY_QUEUE_ORDER_CLOSE_HASH = 'DELAY:QUEUE:ORDER:CLOSE:HASH';

    /**
     * Redis 静态实例
     * @return \Redis
     */
    private static function _redis()
    {
        $redis = \redis\BaseRedis::server();
        $redis->select(3);
        return $redis;
    }

    /**
     * @desc: 延迟队列 生产者
     * @param string $keys1
     * @param string $keys2
     * @param string $member
     * @param int $score
     * @param array $message
     * @return mixed
     */
    public static function producer(string $keys1, string $keys2, string $member, int $score, array $message)
    {
        $redis = self::_redis();
        $scriptSha = $redis->get(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA);
        if (!$scriptSha) {
            $script = <<<luascript
            redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2])
            redis.call('HSET', KEYS[2], ARGV[2], ARGV[3])
            return 1
luascript;
            $scriptSha = $redis->script('load', $script);
            $redis->set(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA, $scriptSha);
        }
        $hashValue = json_encode($message, JSON_UNESCAPED_UNICODE);
        return $redis->evalSha($scriptSha, [$keys1, $keys2, $score, $member, $hashValue], 2);
    }

    /**
     * @desc: 延迟队列 消费者
     * @param string $keys1
     * @param string $keys2
     * @param int $maxScore
     * @return mixed
     */
    public static function consumer(string $keys1, string $keys2, int $maxScore)
    {
        $redis = self::_redis();
        $scriptSha = $redis->get(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA);
        if (!$scriptSha) {
            $script = <<<luascript
            local status, type = next(redis.call('TYPE', KEYS[1]))
            if status ~= nil and status == 'ok' then
                if type == 'zset' then
                    local list = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', ARGV[3], ARGV[4])
                    if list ~= nil and #list > 0 then
                        redis.call('ZREM', KEYS[1], unpack(list))
                        local result = redis.call('HMGET', KEYS[2], unpack(list))
                        redis.call('HDEL', KEYS[2], unpack(list))
                        return result
                    end
                end
            end
luascript;
            $scriptSha = $redis->script('load', $script);
            $redis->set(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA, $scriptSha);
        }
        return $redis->evalSha($scriptSha, [$keys1, $keys2, $maxScore, 0,  0, 10], 2);
    }
}

用redis来实现可以依赖于redis自身的持久化来实现持久化,redis的集群来支持高并发和高可用。因此开发成本很小,可以做到很实时。

脚本命令行

生产者消息
代码语言:javascript
复制
private function delayQueueOrderClose()
{
    $orderId = time();
    $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
    $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
    $score = time() + 60; // 延迟60秒执行
    $message = [
        'event' => RedisDelayQueue::EVENT_ORDER_CLOSE,
        'order_id' => $orderId,
        'create_time' => time()
    ];
    $res = RedisDelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
    var_dump($res);
}

如果是ThinkPHP6 框架,执行该命令则可以生产消息,php think crontab delay-queue-order-producer

循环

代码语言:javascript
复制
private function delayOrderProducer()
{
    $keys1 = DelayQueue::KEY_ORDER_CLOSE;
    $keys2 = DelayQueue::KEY_ORDER_CLOSE_HASH;
    for ($i = 1; $i <= 10; $i++) {
        $orderId = 'S' . $i;
        $score = time(); // 延迟60秒执行
        $message = [
            'event' => DelayQueue::EVENT_ORDER_CLOSE,
            'order_id' => $orderId,
            'create_time' => time()
        ];
        $res = DelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message);
        var_dump($res);
    }
}
消费者消息

1、通过Crontab 轮询执行

代码语言:javascript
复制
private function delayQueueOrderConsumer()
{
    $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
    $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
    $maxScore = time();
    $queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
    if (false === $queueList) {
        echo ' [x] Message List is Empty, Try Again ', "\n";
        return;
    }
    var_dump($queueList);
}

说明:如果最小的分数小于等于当前时间戳,就将该任务取出来执行,否则休眠一段时间后再查询

2、阻塞执行

代码语言:javascript
复制
private function delayQueueOrderConsumerWhile()
{
    $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE;
    $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH;
    while (true) {
        $maxScore = time();
        $queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore);
        if (false === $queueList) {
            echo ' [x] Message List is Empty, Try Again ', "\n";
            sleep(1);
            continue;
        }
        // 处理业务
        foreach ($queueList as $queue) {
            $messageArray = json_decode($queue, true);
        }
   }
}

数据删除为处理问题

方案一:弹出订单内容数据的同时进行数据删除,也就是ZREVRANGEBYSCOREZREMHDEL命令要在同一个Lua脚本中执行,这样的话Lua脚本的编写难度大,并且由于弹出数据已经在Redis中删除,如果数据处理失败则可能需要从数据库重新查询补偿。

针对以上的解决方案就是:消息进入到延迟队列后,保证至少被消费一次。

  • 消费延迟队列消息后(zset结构中扫描到期的消息),不及时消费
  • 把读取的消息放入一个 redis stream 队列,同时加入消费组
  • 通过消费组消费 redis stream 消费,处理业务逻辑
  • Redis Stream 消费组,读取消息处理并且 ACK(将消息标记为"已处理")
  • 如果消息读取但是没处理,则进入XPENDING 列表,进行二次消费并且 ACK(将消息标记为"已处理")
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-05-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开源技术小栈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 延时任务和定时任务区别
  • 业务场景
  • 使用场景
    • 1、延迟消费
      • 2、延迟重试
      • 缓存队列设计
      • 场景设计
      • 延时队列的实现
        • 具体方案是:
          • 对于第4点处理有两种方案:
          • Redis相关命令
            • Sorted Set相关命令
              • Hash相关命令
                • Lua 语法
                • Lua 脚本
                  • 入队 enqueue.lua
                    • 出队 dequeue.lua
                    • 业务核心代码
                      • 延迟队列类 RedisDelayQueue.php
                      • 脚本命令行
                        • 生产者消息
                          • 消费者消息
                          • 数据删除为处理问题
                          相关产品与服务
                          云数据库 Redis
                          腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档