延时队列本质上是一个 时间驱动的队列:
Redis 的数据结构里,我们有几个选择:
ZRANGEBYSCORE
获取到期任务。推荐方案:使用 ZSET(有序集合),简洁高效。
ZADD delay_queue <execute_timestamp> <task_json>
示例:
ZADD delay_queue 1694505600 '{"task_id": 123, "type":"email"}'
这里
1694505600
是 UNIX 时间戳,表示任务执行时间。
消费者不断轮询 ZSET,获取 score ≤ 当前时间的任务:
ZRANGEBYSCORE delay_queue -inf <current_timestamp>
ZREM delay_queue <task_json>
for
循环 + sleep
。 * 查询到期任务
* 删除任务
* 返回任务
示例 Lua:
local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2])
if #tasks > 0 then
redis.call('ZREM', KEYS[1], unpack(tasks))
end
return tasks
调用:
EVAL <lua_script> 1 delay_queue <current_timestamp> 10
这里
10
表示一次最多获取 10 条任务。
// 添加任务
func AddTask(rdb *redis.Client, queue string, delay time.Duration, task string) error {
ts := time.Now().Add(delay).Unix()
return rdb.ZAdd(context.Background(), queue, &redis.Z{
Score: float64(ts),
Member: task,
}).Err()
}
// 消费任务
func PopTasks(rdb *redis.Client, queue string, batchSize int) ([]string, error) {
now := time.Now().Unix()
lua := `
local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1], 'LIMIT', 0, ARGV[2])
if #tasks > 0 then
redis.call('ZREM', KEYS[1], unpack(tasks))
end
return tasks
`
res, err := rdb.Eval(context.Background(), lua, []string{queue}, now, batchSize).Result()
if err != nil {
return nil, err
}
var tasks []string
for _, t := range res.([]interface{}) {
tasks = append(tasks, t.(string))
}
return tasks, nil
}
总结
ZRANGEBYSCORE
获取到期任务。原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。