项目中经常需要做某个操作, 然后一定时间之后看这个操作的执行结果。 要么使用定时任务扫描, 要么使用延时队列(任务)来实现。
在主流的 MQ 中支持延时消息的有 RabbitMQ
和 RocketMQ
, 如果没有使用这个两个 MQ, 譬如使用了 Kafka, 又想使用延时消息的功能可以使用 Redis来实现。
Redis 实现延时队列有三种方案:
Zset(Sorted Set)
来实现redis 可以基于 list 来实现队列, 通过 LPop 和 RPush 保证先入先出。
在延时队列场景可以使用 zset
, 实现原理:
ZRANGEBYSCORE
获取到期的消息, 将到期的消息迁移到 List 即可(或不要这一步, 直接消费)将到期消息的往 list 的迁移需要三个动作:
这三个动作需要保证原子性(要么都成功,要么都失败), 可以使用 lua 脚本来实现。
备注: 虽然 redis 本身支持事务, 但是 redis 的事务机制不是那么合理, 当运行出错的时候会跳过出错的命令继续执行(只有语法错误才会失败), 并不能完全保证原子性, 所以大部分框架还是会选择用 lua 脚本
本章节摘取于其他文章的测试结果, 出处在参考资料
压测环境:
memtier_benchmark
Lpop
和 Rpush
的时间复杂度是O(1):zadd
复杂度是 O(M*log(N)), N是有序集的基数,M为成功添加的新成员的数量zadd benchmark 结果:
zrangebyscore
复杂度是 O(log(N)+M), N 为有序集的基数, M 为被结果集的基数。
KEYSPACE NOTIFICATIONS
监听过期事件: CONFIG SET notify-keyspace-events Ex
Keyspace Notifications,可以用于监控 Redis 内的 Key 和 Value的变化,包括 Key 过期事件。像监听过期 Key 的功能就是通过 Keyspace Notifications 实现的。 基本原理是:Pub/Sub。客户端通过订阅 Pub/Sub 频道,来感知事件的发生。
开启 KEYSPACE NOTIFICATIONS
:
# config set 或者 redis.conf 配置
notify-keyspace-events [参数](KEA)
# 禁用该功能 参数设置为空即
### 参数说明
至少需要 K E 中的一个
- K: 以 __keyspace@<db>__ 为前缀的 Keyspace events
- E: 以 __keyevent@<db>__ 为前缀的 Keyevent events
- m: 访问了不存在的 key
- n: 产生了新 key
- A: A是特殊的,代表下面所有的参数的总和,是"g$lshztxed"的别名(除去mnKE的全部)
- x: key 过期事件
- e: Redis内存满了,被内存淘汰的事件
- g: 通用命令
- $: String commands
- s: Set commands
- h: Hash commands
- z: Sorted set commands
- t: Stream commands
- d: Module key type events
Redis 可以通过 List 来做最简单的生产者消费者模式
Lpush
发送, 消费者做一个死循环 Rpop
消费
这种最简单的模式, 有个很明显的问题。 当队列没有数据的时候, 消费者 Rpop
仍然会不停的拉消息, 会造成CPU的空转以及对Redis产生额外的压力
当然, Redis 也提供了阻塞的拉取: BRpop/BLpop
, 这样可以避免CPU空转。不过这仍然有几个问题:
Redis Pub/Sub 机制可以解决这个问题: 生产者通过 Publish
往 channel
发送消息, 消费者通过 Subscribe
订阅 channle
, 一个 channel
可以被多个 Subscriber
订阅
不过仍然有这些缺陷:
Subscriber
不在线, 那么这个消费就会丢失, 消息的可靠性得不到保证Stream 是 Redis5.0 推出的一种数据结构, 主要用于当消息队列、日志分析等场景。 我们上面介绍的几种消息队列机制都有一些或多或少的问题:
Redis Steam 有如下机制:
Redis Stream 基本满足了消息队列的大部分要求
XADD
指令追加消息时自动创建。XGROUP CREATE
命令手动创建,在同一个 Stream 内消费者组名称唯一。一个消费组可以有多个消费者(Consumer)同时进行组内消费,所有消费者共享Stream内的所有信息,但同一条消息只会有一个消费者消费到,不同的消费者会消费 Stream 中不同的消息,这样就可以应用在分布式的场景中来保证消息消费的唯一性。last_delivered_id
往前移动。创建消费者组时需要指定从 Stream 的哪一个消息ID(哪个位置)开始消费,该位置之前的数据会被忽略,同时还用来初始化 last_delivered_id
这个变量。这个last_delivered_id一般来说就是最新消费的消息ID(这也是借用了 kafka commit offset 的概念)pending_ids
记录了 当前已经被客户端读取,但是还没有 ack 的消息 。 目的是为了保证客户端至少消费了消息一次,而不会在网络传输的中途丢失而没有对消息进行处理。如果客户端没有 ack,那么这个变量里面的消息ID 就会越来越多,一旦某个消息被 ack,它就会对应开始减少。这个变量也被 Redis 官方称为 PEL (Pending Entries List)XADD:向Stream中添加消息。如果指定的Stream不存在,则会自动创建
// MAXLEN maxlen:可选参数,用于限制Stream的最大长度。当Stream的长度达到maxlen时,旧的消息会被自动删除。
// ID id:可选参数,用于指定消息的ID。如果不指定该参数,Redis会自动生成一个唯一的ID。
// field1 value1 [field2 value2 ...]:消息的字段和值,消息的内容以key-value的形式存在。
XADD stream_name [MAXLEN maxlen] [ID id] field1 value1 [field2 value2 ...]
XREAD:以阻塞/非阻塞方式获取Stream中的消息列表。
// COUNT count:可选参数,用于指定一次读取的最大消息数量。如果不指定,默认为1。
// BLOCK milliseconds:也是一个可选参数,用于指定阻塞的时间(以毫秒为单位)。如果指定了阻塞时间,并且当前没有可消费的消息,客户端将在指定的时间内阻塞等待。如果不设置该参数或设置为0,则命令将立即返回,无论是否有可消费的消息。
// STREAMS key [key ...] ID [ID ...]:这部分指定了要消费的流(Streams)和对应的起始消息ID。可以一次指定多个流和对应的起始ID。
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XREADGROUP:从消费者组中读取消息,支持阻塞读取。
XACK:确认消费者已经成功处理了消息。
XGROUP:用于管理消费者组,包括创建、设置ID、销毁消费者组等操作。
XPENDING:查询消费者组中的待处理消息。