首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >使用Redis实现一个延时队列

使用Redis实现一个延时队列

原创
作者头像
闫同学
发布2025-09-12 20:21:27
发布2025-09-12 20:21:27
20900
代码可运行
举报
运行总次数:0
代码可运行

延时队列本质上是一个 时间驱动的队列

  • 生产者:将任务放入队列时,指定一个“延迟时间”或“执行时间”。
  • 消费者:不断检查队列,只有当任务到期时才取出来处理。

Redis 的数据结构里,我们有几个选择:

  1. ZSET(有序集合)
  • 分值(score)用作任务的执行时间(时间戳)。
  • 成为延时队列最常用方案。
  • 查询方式:通过 ZRANGEBYSCORE 获取到期任务。
  1. LIST
  • 可以用作普通队列,但 LIST 本身不支持按时间排序。
  • 如果使用 LIST,需要在应用层自己做排序或者轮询,非常不高效。
  1. Stream
  • Redis 5.0 引入的 Stream,也可以用作延时队列,但实现较复杂。
  • 常用在消息队列场景。

推荐方案:使用 ZSET(有序集合),简洁高效。


基于 ZSET 的方案设计

1.1 生产者(添加任务)

  • 将任务序列化(JSON 或字符串)。
  • 用任务的 执行时间戳 作为 ZSET 的 score。
  • 添加到 ZSET:
代码语言:go
复制
ZADD delay_queue <execute_timestamp> <task_json>

示例:

代码语言:bash
复制
ZADD delay_queue 1694505600 '{"task_id": 123, "type":"email"}'

这里 1694505600 是 UNIX 时间戳,表示任务执行时间。


1.2 消费者(轮询任务)

消费者不断轮询 ZSET,获取 score ≤ 当前时间的任务:

代码语言:bash
复制
ZRANGEBYSCORE delay_queue -inf <current_timestamp>
  • 获取到的任务就是到期任务。
  • 处理完任务后,从 ZSET 中删除:
代码语言:bash
复制
ZREM delay_queue <task_json>

1.3 实现细节

  1. 轮询方式
  • 可以用简单的 for 循环 + sleep
  • 也可以用 Redis 的 阻塞方式,例如 Lua 脚本和 BRPOP 配合,但 ZSET 队列没有阻塞命令,需要轮询。
  1. Lua 脚本原子化
  • 为了避免多个消费者同时获取任务造成重复消费,可以用 Lua 脚本一次完成:
代码语言:c
代码运行次数:0
运行
复制
 * 查询到期任务
 * 删除任务
 * 返回任务

示例 Lua:

代码语言:lua
复制
local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2])
if #tasks > 0 then
    redis.call('ZREM', KEYS[1], unpack(tasks))
end
return tasks

调用:

代码语言:bash
复制
EVAL <lua_script> 1 delay_queue <current_timestamp> 10

这里 10 表示一次最多获取 10 条任务。

  1. 任务幂等性
  • 处理任务可能失败,确保消费逻辑幂等。
  • 可在任务 JSON 中加任务 ID。
  1. 多消费者
  • 使用 Lua 脚本保证原子性。
  • 每个消费者轮询时获取到的任务不会重复。
  1. 定时优化
  • 为了降低轮询频率,可以使用 Redis Key 过期事件 + ZSET,或者在应用层做 动态 sleep,根据最近任务的执行时间决定下一次轮询时间。

Go 语言示例(生产者 + 消费者)

代码语言:go
复制
// 添加任务
func AddTask(rdb *redis.Client, queue string, delay time.Duration, task string) error {
    ts := time.Now().Add(delay).Unix()
    return rdb.ZAdd(context.Background(), queue, &redis.Z{
        Score:  float64(ts),
        Member: task,
    }).Err()
}

// 消费任务
func PopTasks(rdb *redis.Client, queue string, batchSize int) ([]string, error) {
    now := time.Now().Unix()
    lua := `
    local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2])
    if #tasks > 0 then
        redis.call('ZREM', KEYS[1], unpack(tasks))
    end
    return tasks
    `
    res, err := rdb.Eval(context.Background(), lua, []string{queue}, now, batchSize).Result()
    if err != nil {
        return nil, err
    }
    var tasks []string
    for _, t := range res.([]interface{}) {
        tasks = append(tasks, t.(string))
    }
    return tasks, nil
}

优化方案

  1. 延迟任务量很大时
  • 可以分多个 ZSET,按天/小时划分,避免单个 ZSET 太大。
  1. 精度优化
  • 轮询频率可根据最近任务的最小时间间隔动态调整。
  1. 持久化
  • Redis 支持 RDB/AOF,确保任务不会丢失。
  • 也可以在任务状态持久化到数据库,做二次保证。

总结

  • Redis 延时队列的核心是 ZSET
  • 核心思路:
  1. 用任务的执行时间作为 score。
  2. 消费者通过 ZRANGEBYSCORE 获取到期任务。
  3. Lua 脚本保证原子性。
  4. 可扩展性好,多消费者安全,适合秒级延时任务。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基于 ZSET 的方案设计
    • 1.1 生产者(添加任务)
    • 1.2 消费者(轮询任务)
    • 1.3 实现细节
  • Go 语言示例(生产者 + 消费者)
  • 优化方案
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档