前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis 延迟队列实现(基于PHP)

Redis 延迟队列实现(基于PHP)

作者头像
陈大剩博客
发布2023-03-06 09:05:12
8890
发布2023-03-06 09:05:12
举报
文章被收录于专栏:陈大剩博客专栏

延迟队列介绍

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。

例如:滴滴打车订单完成后,如果用户一直不评价,48小时后会将自动评价为5星。

Redis实现延迟队列

Redis 可以利用 zset (有序列表)来实现,将消息序列化成一个字符串作为 zsetvalue; 这个消息的到期处理时间作为 score,利用多个线程轮询 zset 获取到期的任务进行处理。 多线程是为了保证可用性,万一挂了一个线程还有其他线程可以继续处理; 因为有多个线程,所以需要考虑并发争抢任务,确保任务不会多次执行

延迟队列实现逻辑
延迟队列实现逻辑

代码实现

代码语言:javascript
复制
require_once("../RedisClient.php");
$client = RedisClient::getInstance();

//延时队列
function delay(string $message, int $timeout = 5)
{
    global $client;
    $time = microtime(true) + $timeout;
    return $client->zadd('delay:', [$message => $time]);
}

//顺序消费延迟队列中的消息
function loop()
{
    global $client;
    while (true) {
        //从延迟队列获取一条最近时间的消息
        $message_data = $client->zrangebyscore('delay:', '-inf', microtime(true), ['withscores' => true, 'limit' => [0, 1]]);
        //延迟队列中无消息
        if (!$message_data) {
            sleep(1);
            continue;
        }
        //提取消息数据
        $message = key($message_data);
        //从延迟队列中删除刚获取的消息
        $success = $client->zrem('delay:', $message);
        //多线程或多进程争抢消息时,
        //根据zrem返回值判断,当前实例有没有抢到任务
        //抢到任务,做业务处理后返回
        if ($success) {
            //do something..
            echo sprintf("消费的消息,[%s]", $message) . PHP_EOL;
        }
    }
}
//delay('test1');
//delay('test2');
//delay('test2');

//loop();

# php queue.php 
消费的消息,[mmm1]
消费的消息,[mmm2]
消费的消息,[mmm3]

进一步优化

细心的同学会发现上面算法代码中,有几处问题

  • 同一个任务被多个进程取到后再使用 zrem 进行争抢,没有抢到的进程白白浪费了一次任务;
  • 取出条数和删除只能一条,且 zrangebyscorezrem 不是原子操作;
  • 消息取出后,执行了一部分逻辑,服务器突然重启了,剩下的逻辑没有执行完成该如何处理?

我们可以通过使用lua脚本,解决前面两个问题,至于第三个问题可以通过代码层面其他数据库事务解决。

lua 脚本
lua 脚本
代码语言:javascript
复制
require_once("../RedisClient.php");
use Predis\Command\ScriptCommand;
$client = RedisClient::getInstance();

/**
 * 从消息队列中搜索符合条件的最近n条消息
 * 返回消息内容并从消息队列中删除
 * @param string queue_key 消息队列的key
 * @param int $min      搜索时间戳开始时间
 * @param int $max      搜索时间戳结束时间
 * @param int $offset   要跳过的消息数量
 * @param int $limit    获取消息数量
 * @return array 删除成功的消息的消息内容
 * @
 */
class getAndDeleteRecentMessageScript extends ScriptCommand {

    public function getKeysCount()
    {
        return 1;
    }

    public function getScript()
    {
        return <<<LUA
-- 消息队列的redisKey
local queue = KEYS[1]
-- 搜索范围的最大/最小值,偏移量和取值数量
local min, max, offset, count = ARGV[1], ARGV[2], ARGV[3], ARGV[4] 
local message = false
local messages = {}
local queue_value = {}
local insert = table.insert
-- 获取最近n条消息并删除消息
queue_value = redis.call("ZRANGEBYSCORE",queue,min,max,"LIMIT",offset,count)
for idx, message in pairs(queue_value) do
    if redis.call("ZREM",queue,message) then
        insert(messages, idx, message)
    end
end
-- 返回删除成功的消息
return messages
LUA;

    }
}
$client->getProfile()->defineCommand('get_and_delete_recent_message','getAndDeleteRecentMessageScript');

//向延迟队列中写入10条数据
foreach(range(1,10) as $msg_id){
    $success = delay("msg{$msg_id}");
    if($success){
        echo "写入消息[msg{$msg_id}], 成功" . PHP_EOL;
    }
}

//删除最近写入的 2条
$ret = $conn->get_and_delete_recent_message('delay:',0,microtime(true),3,2);

var_export($ret);

后记

延时队列是一个实现“延时消息”的好方法,解决了业务问题。至于可达性、幂等性未来另述。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-08-22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 延迟队列介绍
  • Redis实现延迟队列
    • 代码实现
      • 进一步优化
      • 后记
      相关产品与服务
      云数据库 Redis
      腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档