专栏首页Web技术研发工具系列 | Redis Stream 类型的消息队列

工具系列 | Redis Stream 类型的消息队列

概述

Redis 5 新特性中,Streams 数据结构的引入,可以说它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作为消息队列使用时,得到更完善,更强大的原生支持,其中尤为明显的是持久化消息队列。

同时,stream 借鉴了 kafka 的消费组模型概念和设计,使消费消息处理上更加高效快速。本文就 Streams 数据结构中常用 API 进行分析。

Stream消息队列

  • 消息 ID 的序列化生成
  • 消息遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 未完成消息的处理
  • 消息队列监控

添加消息(生产消息)

Streams 添加数据使用 XADD 指令进行添加,消息中的数据以 K-V 键值对的形式进行操作。一条消息可以存在多个键值对,添加命令格式:

XADD key ID field string [field string ...]

其中 key 为 Streams 的名称,ID 为消息的唯一标志,不可重复,field string 就为键值对。下面我们就添加以 memberMessage 为名称的流,进行操作。

1127.0.0.1:6379> XADD memberMessage * user kang msg Hello
2"1553439850328-0"
3127.0.0.1:6379> XADD memberMessage * user zhong  msg nihao
4"1553439858868-0"

上面添加案例中,ID 使用 * 号复制,这里代表着服务端自动生成 Id,添加后返回数据 "1553439858868-0"

这里自动生成的 Id 格式为-Id 是由两部分组成:

  • millisecondsTime 为当前服务器时间毫秒时间戳。
  • sequenceNumber 当前序列号,取值来源于当前毫秒内,生成消息的顺序,默认从 0 开始加 1 递增。

比如:1578238486193-3 表示在 1578238486193 毫秒的时间戳时,添加的第 4 条消息。

除了服务端自动生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下条件限制:

Id 中的前后部分必须为数字。最小 Id 为 0-1,不能为 0-0,但是 2-0,3-0 .... 是被允许的。添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。

否则,当不满足上述条件时,添加后会抛出异常:

(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

实际上,当添加一条消息时,会进行两部操作。第一步,先判断如果不存在 Streams,则创建 Streams 的名称,再添加消息到 Streams 中。即使添加消息时,由于 Id 异常,也可以在 Redis 中存在以当前 Streams 的名称。Streams 中 Id 也可作为指针使用,因为它是一个有序的标记。

生产中,如果这样使用添加消息,会存在一个问题,那就是消息数量太大时,会使服务宕机。这里 Streams 的设计初期也有考虑到这个问题,那就是可以指定 Streams 的容量。如果容量操作这个设定的值,就会对调旧的消息。在添加消息时,设置 MAXLEN 参数。

消息 ID 说明

XADD 生成的1553439850328-0,就是 Redis 生成的消息 ID,由两部分组成: 时间戳 - 序号。时间戳是毫秒级单位,是生成消息的 Redis 服务器时间,它是个 64 位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个 64 位整型。

可以通过 multi 批处理,来验证序号的递增:

 1127.0.0.1:6379> MULTI
 2OK
 3127.0.0.1:6379> XADD memberMessage * msg one
 4QUEUED
 5127.0.0.1:6379> XADD memberMessage * msg two
 6QUEUED
 7127.0.0.1:6379> XADD memberMessage * msg three
 8QUEUED
 9127.0.0.1:6379> XADD memberMessage * msg four
10QUEUED
11127.0.0.1:6379> XADD memberMessage * msg five
12QUEUED
13127.0.0.1:6379> EXEC
141) "1553441006884-0"
152) "1553441006884-1"
163) "1553441006884-2"
174) "1553441006884-3"
185) "1553441006884-4"

由于一个 redis 命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。

为了保证消息是有序的,因此 Redis 生成的 ID 是_单调递增_有序的。由于 ID 中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis 的每个 Stream 类型数据都维护一个 latest_generated_id 属性,用于记录最后一个消息的 ID。若发现当前时间戳退后(小于 latest_generated_id 所记录的),则采用时间戳不变而序号递增的方案来作为新消息 ID(这也是序号为什么使用 int64 的原因,保证有足够多的的序号),从而保证 ID 的单调递增性质。

读取消息(消费消息)

在 Redis 的 PUB/SUB 中,我们是通过订阅来消费消息,在 Streams 数据结构中,同样也能实现同等功能,当没有新的消息时,可进行阻塞等待。不仅支持单独消费,而且还可以支持群组消费。

单独消费

单独消费使用 XREAD 指令。可以看到,下面命令中,STREAMS,key, 以及 ID 为必填项。ID 表示将要读取大于该 ID 的消息。当 ID 值使用 $ 赋予时,表示已存在消息的最大 Id 值。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

上面的 COUNT 参数用来指定读取的最大数量,与 XRANGE 的用法一样。

 1127.0.0.1:6379> XREAD streams memberMessage 0
 21) 1) "memberMessage"
 3   2) 1) 1) "1553439850328-0"
 4         2) 1) "user"
 5            2) "kang"
 6            3) "msg"
 7            4) "Hello"
 8      2) 1) "1553439858868-0"
 9         2) 1) "user"
10            2) "zhong"
11            3) "msg"
12            4) "nihao"

XREAD 支持很多参数,语法格式为:

  • [COUNT count],用于限定获取的消息数量
  • [BLOCK milliseconds],用于设置 XREAD 为阻塞模式,默认为非阻塞模式
  • ID,用于设置由哪个消息 ID 开始读取。使用 0 表示从第一条消息开始。(本例中就是使用 0)此处需要注意,消息队列 ID 是单调递增的,所以通过设置起点,可以向后读取。

XRED 读消息时分为阻塞和非阻塞模式,使用 BLOCK 选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。

BLOCK 携带的参数为阻塞时间,单位为毫秒,如果在这个时间内没有新的消息消费,那么就会释放该阻塞。当这里的时间指定为 0 时,会一直阻塞,直到有新的消息来消费到。

阻塞模式用法:

1127.0.0.1:6379> XREAD block 1000 streams memberMessage $
2(nil)
3(1.07s)

使用 Block 模式,配合 $ 作为 ID,表示读取最新的消息,若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。

因此,典型的队列就是 XADD 配合 XREAD Block 完成。XADD 负责生成消息,XREAD 负责消费消息。

群组消费

群组消费的主要目的也就是为了分流消息给不同的客户端处理,以更高效的速率处理消息。为达到这一肝功能需求,我们需要做三件事:「创建群组,群组读取消息,向服务端确认消息以处理。」

群组操作

操作群组使用 XGROUP 指令:

XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

上面命令中,包含操作有:

  • CREATE 创建消费组。
  • SETID 修改下一个处理消息的 Id。
  • DESTROY 销毁消费组。
  • DELCONSUMER 删除消费组中指定的消费者。

群组读取消息

群组读取使用 XREADGROUP 指令,COUNT和BLOCK的使用类似 XREAD 的操作,只是多了个群组和消费者的指定:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

由于群组消费和单独消费类似,这里只进行个阻塞分析,这里 Id 也有个特殊值>,表示还未进行消费的消息。

Pending 等待列表

为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。演示如下:

 1127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
 21) (integer) 5 # 5个已读取但未处理的消息
 32) "1553585533795-0" # 起始ID
 43) "1553585533795-4" # 结束ID
 54) 1) 1) "consumerA" # 消费者A有3个
 6      2) "3"
 7   2) 1) "consumerB" # 消费者B有1个
 8      2) "1"
 9   3) 1) "consumerC" # 消费者C有1个
10      2) "1"
11
12127.0.0.1:6379> XPENDING mq mqGroup - + 10 # 使用 start end count 选项可以获取详细信息
131) 1) "1553585533795-0" # 消息ID
14   2) "consumerA" # 消费者
15   3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE
16   4) (integer) 5 # 消息被读取了5次,delivery counter
172) 1) "1553585533795-1"
18   2) "consumerA"
19   3) (integer) 1654355
20   4) (integer) 4
21# 共5个,余下3个省略 ...
22
23127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
241) 1) "1553585533795-0"
25   2) "consumerA"
26   3) (integer) 1641083
27   4) (integer) 5
28# 共3个,余下2个省略 ...

每个 Pending 的消息有 4 个属性:

  1. 消息 ID
  2. 所属消费者
  3. IDLE,已读取时长
  4. delivery counter,消息被读取次数

上面的结果我们可以看到,我们之前读取的消息,都被记录在 Pending 列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令 XACK 完成告知消息处理完成

消息ACK

消息消费后,为避免再次重复消费,这是需要向服务端发送 ACK,确保消息被消费后的标记。例如下列情况,我们上面我们将最新两条消息已进行了消费,但是当我们再次读取消息时,还是被读到:

 1127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
 2(integer) 1
 3
 4127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
 51) (integer) 4 # 已读取但未处理的消息已经变为4个
 62) "1553585533795-1"
 73) "1553585533795-4"
 84) 1) 1) "consumerA" # 消费者A,还有2个消息处理
 9      2) "2"
10   2) 1) "consumerB"
11      2) "1"
12   3) 1) "consumerC"
13      2) "1"
14127.0.0.1:6379> 

有了这样一个 Pending 机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该 Pending 列表,就可以继续处理该消息了,保证消息的有序和不丢失。

此时还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者 Pending 的消息,转义给其他的消费者处理,就是消息转移。请继续。

消息转移

消息转移的操作时将某个消息转移到自己的 Pending 列表中。使用语法XCLAIM来实现,需要设置组、转移的目标消费者和消息 ID,同时需要提供 IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:

 1# 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
 2127.0.0.1:6379> XPENDING mq mqGroup - + 10
 31) 1) "1553585533795-1"
 4   2) "consumerA"
 5   3) (integer) 15907787
 6   4) (integer) 4
 7
 8# 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
 9127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
101) 1) "1553585533795-1"
11   2) 1) "msg"
12      2) "2"
13
14# 消息1553585533795-1已经转移到消费者B的Pending中。
15127.0.0.1:6379> XPENDING mq mqGroup - + 10
161) 1) "1553585533795-1"
17   2) "consumerB"
18   3) (integer) 84404 # 注意IDLE,被重置了
19   4) (integer) 5 # 注意,读取次数也累加了1次

以上代码,完成了一次消息转移。转移除了要指定 ID 外,还需要指定 IDLE,保证是长时间未处理的才被转移。被转移的消息的 IDLE 会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了 IDLE,则可以避免后面的转移不会成功,因为 IDLE 不满足条件。例如下面的连续两条转移,第二条不会成功。

1127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
2127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1

这就是消息转移。至此我们使用了一个 Pending 消息的 ID,所属消费者和 IDLE 的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。

消费者组模式详解

当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有 10 条消息,三个消费者都可以消费到这 10 条消息。

但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有 10 条消息,三个消费者分别消费其中的某些消息,比如消费者 A 消费消息 1、2、5、8,消费者 B 消费消息 4、9、10,而消费者 C 消费消息 3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。如下图所示:

消费者组模式的支持主要由两个命令实现:

  • XGROUP,用于管理消费者组,提供创建组,销毁组,更新组起始消息 ID 等操作
  • XREADGROUP,分组消费消息操作

进行演示,演示时使用 5 个消息,思路是:创建一个 Stream 消息队列,生产者生成 5 条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:

 1# 生产者生成10条消息
 2127.0.0.1:6379> MULTI
 3127.0.0.1:6379> XADD mq * msg 1 # 生成一个消息:msg 1
 4127.0.0.1:6379> XADD mq * msg 2
 5127.0.0.1:6379> XADD mq * msg 3
 6127.0.0.1:6379> XADD mq * msg 4
 7127.0.0.1:6379> XADD mq * msg 5
 8127.0.0.1:6379> EXEC
 9 1) "1553585533795-0"
10 2) "1553585533795-1"
11 3) "1553585533795-2"
12 4) "1553585533795-3"
13 5) "1553585533795-4"
14
15# 创建消费组 mqGroup
16127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 # 为消息队列 mq 创建消费组 mgGroup
17OK
18
19# 消费者A,消费第1条
20127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq > #消费组内消费者A,从消息队列mq中读取一个消息
211) 1) "mq"
22   2) 1) 1) "1553585533795-0"
23         2) 1) "msg"
24            2) "1"
25# 消费者A,消费第2条
26127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq > 
271) 1) "mq"
28   2) 1) 1) "1553585533795-1"
29         2) 1) "msg"
30            2) "2"
31# 消费者B,消费第3条
32127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq > 
331) 1) "mq"
34   2) 1) 1) "1553585533795-2"
35         2) 1) "msg"
36            2) "3"
37# 消费者A,消费第4条
38127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA count 1 STREAMS mq > 
391) 1) "mq"
40   2) 1) 1) "1553585533795-3"
41         2) 1) "msg"
42            2) "4"
43# 消费者C,消费第5条
44127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerC COUNT 1 STREAMS mq > 
451) 1) "mq"
46   2) 1) 1) "1553585533795-4"
47         2) 1) "msg"
48            2) "5"

上面的例子中,三个在同一组 mpGroup 消费者 A、B、C 在消费消息时(消费者在消费时指定即可,不用预先创建),有着互斥原则,消费方案为,A->1, A->2, B->3, A->4, C->5。语法说明为:

XGROUP CREATE mq mqGroup 0,用于在消息队列 mq 上创建消费组 mpGroup,最后一个参数 0,表示该组从第一条消息开始消费。(意义与 XREAD 的 0 一致)。除了支持CREATE外,还支持SETID设置起始 ID,DESTROY销毁组,DELCONSUMER删除组内消费者等操作。

XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >,用于组mqGroup内消费者consumerA在队列mq中消费,参数>表示未被组内消费的起始消息,参数count 1表示获取一条。语法与XREAD基本一致,不过是增加了组的概念。

可以进行组内消费的基本原理是,STREAM 类型会为每个组记录一个最后处理(交付)的消息 ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。

以上就是消费组的基础操作。除此之外,消费组消费时,还有一个必须要考虑的问题,就是若某个消费者,消费了某条消息,但是并没有处理成功时(例如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了。下面继续讨论解决方案。

本文分享自微信公众号 - 万少波的播客(Tinywanblog),作者:Tinywan

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-02-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Redis中的Stream数据类型作为消息队列的尝试

    Redis的List数据类型作为消息队列,已经比较合适了,但存在一些不足,比如只能独立消费,订阅发布又无法支持数据的持久化,相对前两者,Redis Stream...

    Java_老男孩
  • Redis Stream——作为消息队列的典型应用场景

    Redis最新的大版本5.0已经RC1了,其中最重要的Feature莫过于 Redis Stream 了,关于Redis Stream的基本使用介绍和设计理念可...

    猿哥
  • Redis 中如何实现的消息队列?实现的方式有几种?

    细心的你可能发现了,本系列课程中竟然出现了三个课时都是在说消息队列,第 10 课时讲了程序级别的消息队列以及延迟消息队列的实现,而第 15 课时讲了常见的消息队...

    码农架构
  • Redis进阶-Stream多播的可持久化的消息队列

    PubSub 的生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息直接丢弃。

    小小工匠
  • 消息队列Rabbitmq的交换器类型

    在rabbitmq中,生产者的消息都是通过交换器来接收,然后再从交换器分发到不同的队列中去,在分发的过程中交换器类型会影响分发的逻辑。

    汤青松
  • 消息队列Rabbitmq的交换器类型

    在rabbitmq中,生产者的消息都是通过交换器来接收,然后再从交换器分发到不同的队列中去,在分发的过程中交换器类型会影响分发的逻辑。

    汤青松
  • 消息队列面试解析系列(一)- 消息队列的意义

    见名知义,消息队列主要就是用来发送和接收处理消息,但它的作用可不仅解决应用间通信问题。

    JavaEdge
  • Redis实现消息队列的4种方案

    Redis作为内存中的数据结构存储,常用作数据库、缓存和消息代理。它支持数据结构,如 字符串,散列,列表,集合,带有范围查询的排序集(sorted sets),...

    allsmallpig
  • 大型网站架构系列:消息队列

    大型网站架构系列:消息队列 一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最...

    用户1289394
  • 大型网站架构系列:消息队列

    消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件...

    后端技术探索
  • 大型网站架构系列:消息队列

    消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件...

    后端技术探索
  • 把Redis当作队列来用,真的合适吗?

    也些人则反对,认为 Redis 会「丢」数据,最好还是用「专业」的队列中间件更稳妥。

    PHP开发工程师
  • 把Redis当作队列来用,真的合适吗?

    也些人则反对,认为 Redis 会「丢」数据,最好还是用「专业」的队列中间件更稳妥。

    _Kaito
  • Python使用redis的消息队列

    Criss@陈磊
  • 消息队列面试解析系列(三)-消息模型辨析

    MQ都得有消息模型,就会产生比如队列(Queue)、主题(Topic)、分区(Partition)这些名词,但是概念上却不尽相同。

    JavaEdge
  • Redis系列(十六)应用之两种缓存和两种队列

    Redis 是一个很强大的内存数据库,而依据我学习 Redis 的经验,网上最缺的资料不是 Redis 的实现原理,Redis 的运维等等。而是对于 Redis...

    呼延十
  • 大型网站架构系列:消息队列(二)

    本文是大型网站架构系列:消息队列(二),主要分享JMS消息服务,常用消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka)。 【第二篇...

    小小科
  • 使用redis stream实现队列服务

    Redis5.0引入了Stream类型。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:

    跑马溜溜的球
  • Redis系列 |(一)六种基本数据结构

    Redis 是一个开源,高级的键值存储和一个适用的解决方案,用于构建高性能,可扩展的 Web 应用程序。Redis 也被作者戏称为 数据结构服务器 ,这意味着使...

    Tinywan

扫码关注云+社区

领取腾讯云代金券