前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >工具系列 | Redis Stream 类型的消息队列

工具系列 | Redis Stream 类型的消息队列

作者头像
Tinywan
发布2021-02-25 16:27:17
1.4K0
发布2021-02-25 16:27:17
举报
文章被收录于专栏:开源技术小栈

概述

Redis 5 新特性中,Streams 数据结构的引入,可以说它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作为消息队列使用时,得到更完善,更强大的原生支持,其中尤为明显的是持久化消息队列。

同时,stream 借鉴了 kafka 的消费组模型概念和设计,使消费消息处理上更加高效快速。本文就 Streams 数据结构中常用 API 进行分析。

Stream消息队列

  • 消息 ID 的序列化生成
  • 消息遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 未完成消息的处理
  • 消息队列监控

添加消息(生产消息)

Streams 添加数据使用 XADD 指令进行添加,消息中的数据以 K-V 键值对的形式进行操作。一条消息可以存在多个键值对,添加命令格式:

代码语言:javascript
复制
XADD key ID field string [field string ...]

其中 key 为 Streams 的名称,ID 为消息的唯一标志,不可重复,field string 就为键值对。下面我们就添加以 memberMessage 为名称的流,进行操作。

代码语言:javascript
复制
1127.0.0.1:6379> XADD memberMessage * user kang msg Hello
2"1553439850328-0"
3127.0.0.1:6379> XADD memberMessage * user zhong  msg nihao
4"1553439858868-0"

上面添加案例中,ID 使用 * 号复制,这里代表着服务端自动生成 Id,添加后返回数据 "1553439858868-0"

这里自动生成的 Id 格式为-Id 是由两部分组成:

  • millisecondsTime 为当前服务器时间毫秒时间戳。
  • sequenceNumber 当前序列号,取值来源于当前毫秒内,生成消息的顺序,默认从 0 开始加 1 递增。

比如:1578238486193-3 表示在 1578238486193 毫秒的时间戳时,添加的第 4 条消息。

除了服务端自动生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下条件限制:

Id 中的前后部分必须为数字。最小 Id 为 0-1,不能为 0-0,但是 2-0,3-0 .... 是被允许的。添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。

否则,当不满足上述条件时,添加后会抛出异常:

代码语言:javascript
复制
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

实际上,当添加一条消息时,会进行两部操作。第一步,先判断如果不存在 Streams,则创建 Streams 的名称,再添加消息到 Streams 中。即使添加消息时,由于 Id 异常,也可以在 Redis 中存在以当前 Streams 的名称。Streams 中 Id 也可作为指针使用,因为它是一个有序的标记。

生产中,如果这样使用添加消息,会存在一个问题,那就是消息数量太大时,会使服务宕机。这里 Streams 的设计初期也有考虑到这个问题,那就是可以指定 Streams 的容量。如果容量操作这个设定的值,就会对调旧的消息。在添加消息时,设置 MAXLEN 参数。

消息 ID 说明

XADD 生成的1553439850328-0,就是 Redis 生成的消息 ID,由两部分组成: 时间戳 - 序号。时间戳是毫秒级单位,是生成消息的 Redis 服务器时间,它是个 64 位整型(int64)。序号是在这个毫秒时间点内的消息序号,它也是个 64 位整型。

可以通过 multi 批处理,来验证序号的递增:

代码语言:javascript
复制
 1127.0.0.1:6379> MULTI
 2OK
 3127.0.0.1:6379> XADD memberMessage * msg one
 4QUEUED
 5127.0.0.1:6379> XADD memberMessage * msg two
 6QUEUED
 7127.0.0.1:6379> XADD memberMessage * msg three
 8QUEUED
 9127.0.0.1:6379> XADD memberMessage * msg four
10QUEUED
11127.0.0.1:6379> XADD memberMessage * msg five
12QUEUED
13127.0.0.1:6379> EXEC
141) "1553441006884-0"
152) "1553441006884-1"
163) "1553441006884-2"
174) "1553441006884-3"
185) "1553441006884-4"

由于一个 redis 命令的执行很快,所以可以看到在同一时间戳内,是通过序号递增来表示消息的。

为了保证消息是有序的,因此 Redis 生成的 ID 是_单调递增_有序的。由于 ID 中包含时间戳部分,为了避免服务器时间错误而带来的问题(例如服务器时间延后了),Redis 的每个 Stream 类型数据都维护一个 latest_generated_id 属性,用于记录最后一个消息的 ID。若发现当前时间戳退后(小于 latest_generated_id 所记录的),则采用时间戳不变而序号递增的方案来作为新消息 ID(这也是序号为什么使用 int64 的原因,保证有足够多的的序号),从而保证 ID 的单调递增性质。

读取消息(消费消息)

在 Redis 的 PUB/SUB 中,我们是通过订阅来消费消息,在 Streams 数据结构中,同样也能实现同等功能,当没有新的消息时,可进行阻塞等待。不仅支持单独消费,而且还可以支持群组消费。

单独消费

单独消费使用 XREAD 指令。可以看到,下面命令中,STREAMS,key, 以及 ID 为必填项。ID 表示将要读取大于该 ID 的消息。当 ID 值使用 $ 赋予时,表示已存在消息的最大 Id 值。

代码语言:javascript
复制
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

上面的 COUNT 参数用来指定读取的最大数量,与 XRANGE 的用法一样。

代码语言:javascript
复制
 1127.0.0.1:6379> XREAD streams memberMessage 0
 21) 1) "memberMessage"
 3   2) 1) 1) "1553439850328-0"
 4         2) 1) "user"
 5            2) "kang"
 6            3) "msg"
 7            4) "Hello"
 8      2) 1) "1553439858868-0"
 9         2) 1) "user"
10            2) "zhong"
11            3) "msg"
12            4) "nihao"

XREAD 支持很多参数,语法格式为:

  • [COUNT count],用于限定获取的消息数量
  • [BLOCK milliseconds],用于设置 XREAD 为阻塞模式,默认为非阻塞模式
  • ID,用于设置由哪个消息 ID 开始读取。使用 0 表示从第一条消息开始。(本例中就是使用 0)此处需要注意,消息队列 ID 是单调递增的,所以通过设置起点,可以向后读取。

XRED 读消息时分为阻塞和非阻塞模式,使用 BLOCK 选项可以表示阻塞模式,需要设置阻塞时长。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。

BLOCK 携带的参数为阻塞时间,单位为毫秒,如果在这个时间内没有新的消息消费,那么就会释放该阻塞。当这里的时间指定为 0 时,会一直阻塞,直到有新的消息来消费到。

阻塞模式用法:

代码语言:javascript
复制
1127.0.0.1:6379> XREAD block 1000 streams memberMessage $
2(nil)
3(1.07s)

使用 Block 模式,配合 $ 作为 ID,表示读取最新的消息,若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。

因此,典型的队列就是 XADD 配合 XREAD Block 完成。XADD 负责生成消息,XREAD 负责消费消息。

群组消费

群组消费的主要目的也就是为了分流消息给不同的客户端处理,以更高效的速率处理消息。为达到这一肝功能需求,我们需要做三件事:「创建群组,群组读取消息,向服务端确认消息以处理。」

群组操作

操作群组使用 XGROUP 指令:

代码语言:javascript
复制
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

上面命令中,包含操作有:

  • CREATE 创建消费组。
  • SETID 修改下一个处理消息的 Id。
  • DESTROY 销毁消费组。
  • DELCONSUMER 删除消费组中指定的消费者。
群组读取消息

群组读取使用 XREADGROUP 指令,COUNT和BLOCK的使用类似 XREAD 的操作,只是多了个群组和消费者的指定:

代码语言:javascript
复制
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

由于群组消费和单独消费类似,这里只进行个阻塞分析,这里 Id 也有个特殊值>,表示还未进行消费的消息。

Pending 等待列表

为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息。演示如下:

代码语言:javascript
复制
 1127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情况
 21) (integer) 5 # 5个已读取但未处理的消息
 32) "1553585533795-0" # 起始ID
 43) "1553585533795-4" # 结束ID
 54) 1) 1) "consumerA" # 消费者A有3个
 6      2) "3"
 7   2) 1) "consumerB" # 消费者B有1个
 8      2) "1"
 9   3) 1) "consumerC" # 消费者C有1个
10      2) "1"
11
12127.0.0.1:6379> XPENDING mq mqGroup - + 10 # 使用 start end count 选项可以获取详细信息
131) 1) "1553585533795-0" # 消息ID
14   2) "consumerA" # 消费者
15   3) (integer) 1654355 # 从读取到现在经历了1654355ms,IDLE
16   4) (integer) 5 # 消息被读取了5次,delivery counter
172) 1) "1553585533795-1"
18   2) "consumerA"
19   3) (integer) 1654355
20   4) (integer) 4
21# 共5个,余下3个省略 ...
22
23127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # 在加上消费者参数,获取具体某个消费者的Pending列表
241) 1) "1553585533795-0"
25   2) "consumerA"
26   3) (integer) 1641083
27   4) (integer) 5
28# 共3个,余下2个省略 ...

每个 Pending 的消息有 4 个属性:

  1. 消息 ID
  2. 所属消费者
  3. IDLE,已读取时长
  4. delivery counter,消息被读取次数

上面的结果我们可以看到,我们之前读取的消息,都被记录在 Pending 列表中,说明全部读到的消息都没有处理,仅仅是读取了。那如何表示消费者处理完毕了消息呢?使用命令 XACK 完成告知消息处理完成

消息ACK

消息消费后,为避免再次重复消费,这是需要向服务端发送 ACK,确保消息被消费后的标记。例如下列情况,我们上面我们将最新两条消息已进行了消费,但是当我们再次读取消息时,还是被读到:

代码语言:javascript
复制
 1127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息处理结束,用消息ID标识
 2(integer) 1
 3
 4127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
 51) (integer) 4 # 已读取但未处理的消息已经变为4个
 62) "1553585533795-1"
 73) "1553585533795-4"
 84) 1) 1) "consumerA" # 消费者A,还有2个消息处理
 9      2) "2"
10   2) 1) "consumerB"
11      2) "1"
12   3) 1) "consumerC"
13      2) "1"
14127.0.0.1:6379> 

有了这样一个 Pending 机制,就意味着在某个消费者读取消息但未处理后,消息是不会丢失的。等待消费者再次上线后,可以读取该 Pending 列表,就可以继续处理该消息了,保证消息的有序和不丢失。

此时还有一个问题,就是若某个消费者宕机之后,没有办法再上线了,那么就需要将该消费者 Pending 的消息,转义给其他的消费者处理,就是消息转移。请继续。

消息转移

消息转移的操作时将某个消息转移到自己的 Pending 列表中。使用语法XCLAIM来实现,需要设置组、转移的目标消费者和消息 ID,同时需要提供 IDLE(已被读取时长),只有超过这个时长,才能被转移。演示如下:

代码语言:javascript
复制
 1# 当前属于消费者A的消息1553585533795-1,已经15907,787ms未处理了
 2127.0.0.1:6379> XPENDING mq mqGroup - + 10
 31) 1) "1553585533795-1"
 4   2) "consumerA"
 5   3) (integer) 15907787
 6   4) (integer) 4
 7
 8# 转移超过3600s的消息1553585533795-1到消费者B的Pending列表
 9127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
101) 1) "1553585533795-1"
11   2) 1) "msg"
12      2) "2"
13
14# 消息1553585533795-1已经转移到消费者B的Pending中。
15127.0.0.1:6379> XPENDING mq mqGroup - + 10
161) 1) "1553585533795-1"
17   2) "consumerB"
18   3) (integer) 84404 # 注意IDLE,被重置了
19   4) (integer) 5 # 注意,读取次数也累加了1次

以上代码,完成了一次消息转移。转移除了要指定 ID 外,还需要指定 IDLE,保证是长时间未处理的才被转移。被转移的消息的 IDLE 会被重置,用以保证不会被重复转移,以为可能会出现将过期的消息同时转移给多个消费者的并发操作,设置了 IDLE,则可以避免后面的转移不会成功,因为 IDLE 不满足条件。例如下面的连续两条转移,第二条不会成功。

代码语言:javascript
复制
1127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
2127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1

这就是消息转移。至此我们使用了一个 Pending 消息的 ID,所属消费者和 IDLE 的属性,还有一个属性就是消息被读取次数,delivery counter,该属性的作用由于统计消息被读取的次数,包括被转移也算。

消费者组模式详解

当多个消费者(consumer)同时消费一个消息队列时,可以重复的消费相同的消息,就是消息队列中有 10 条消息,三个消费者都可以消费到这 10 条消息。

但有时,我们需要多个消费者配合协作来消费同一个消息队列,就是消息队列中有 10 条消息,三个消费者分别消费其中的某些消息,比如消费者 A 消费消息 1、2、5、8,消费者 B 消费消息 4、9、10,而消费者 C 消费消息 3、6、7。也就是三个消费者配合完成消息的消费,可以在消费能力不足,也就是消息处理程序效率不高时,使用该模式。该模式就是消费者组模式。如下图所示:

消费者组模式的支持主要由两个命令实现:

  • XGROUP,用于管理消费者组,提供创建组,销毁组,更新组起始消息 ID 等操作
  • XREADGROUP,分组消费消息操作

进行演示,演示时使用 5 个消息,思路是:创建一个 Stream 消息队列,生产者生成 5 条消息。在消息队列上创建一个消费组,组内三个消费者进行消息消费:

代码语言:javascript
复制
 1# 生产者生成10条消息
 2127.0.0.1:6379> MULTI
 3127.0.0.1:6379> XADD mq * msg 1 # 生成一个消息:msg 1
 4127.0.0.1:6379> XADD mq * msg 2
 5127.0.0.1:6379> XADD mq * msg 3
 6127.0.0.1:6379> XADD mq * msg 4
 7127.0.0.1:6379> XADD mq * msg 5
 8127.0.0.1:6379> EXEC
 9 1) "1553585533795-0"
10 2) "1553585533795-1"
11 3) "1553585533795-2"
12 4) "1553585533795-3"
13 5) "1553585533795-4"
14
15# 创建消费组 mqGroup
16127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 # 为消息队列 mq 创建消费组 mgGroup
17OK
18
19# 消费者A,消费第1条
20127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq > #消费组内消费者A,从消息队列mq中读取一个消息
211) 1) "mq"
22   2) 1) 1) "1553585533795-0"
23         2) 1) "msg"
24            2) "1"
25# 消费者A,消费第2条
26127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq > 
271) 1) "mq"
28   2) 1) 1) "1553585533795-1"
29         2) 1) "msg"
30            2) "2"
31# 消费者B,消费第3条
32127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerB COUNT 1 STREAMS mq > 
331) 1) "mq"
34   2) 1) 1) "1553585533795-2"
35         2) 1) "msg"
36            2) "3"
37# 消费者A,消费第4条
38127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerA count 1 STREAMS mq > 
391) 1) "mq"
40   2) 1) 1) "1553585533795-3"
41         2) 1) "msg"
42            2) "4"
43# 消费者C,消费第5条
44127.0.0.1:6379> XREADGROUP GROUP mqGroup consumerC COUNT 1 STREAMS mq > 
451) 1) "mq"
46   2) 1) 1) "1553585533795-4"
47         2) 1) "msg"
48            2) "5"

上面的例子中,三个在同一组 mpGroup 消费者 A、B、C 在消费消息时(消费者在消费时指定即可,不用预先创建),有着互斥原则,消费方案为,A->1, A->2, B->3, A->4, C->5。语法说明为:

XGROUP CREATE mq mqGroup 0,用于在消息队列 mq 上创建消费组 mpGroup,最后一个参数 0,表示该组从第一条消息开始消费。(意义与 XREAD 的 0 一致)。除了支持CREATE外,还支持SETID设置起始 ID,DESTROY销毁组,DELCONSUMER删除组内消费者等操作。

XREADGROUP GROUP mqGroup consumerA COUNT 1 STREAMS mq >,用于组mqGroup内消费者consumerA在队列mq中消费,参数>表示未被组内消费的起始消息,参数count 1表示获取一条。语法与XREAD基本一致,不过是增加了组的概念。

可以进行组内消费的基本原理是,STREAM 类型会为每个组记录一个最后处理(交付)的消息 ID(last_delivered_id),这样在组内消费时,就可以从这个值后面开始读取,保证不重复消费。

以上就是消费组的基础操作。除此之外,消费组消费时,还有一个必须要考虑的问题,就是若某个消费者,消费了某条消息,但是并没有处理成功时(例如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了。下面继续讨论解决方案。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-02-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 万少波的播客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • Stream消息队列
  • 添加消息(生产消息)
  • 消息 ID 说明
  • 读取消息(消费消息)
    • 单独消费
      • 群组消费
        • 群组操作
        • 群组读取消息
        • Pending 等待列表
        • 消息ACK
        • 消息转移
    • 消费者组模式详解
    相关产品与服务
    消息队列 CMQ 版
    消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档