前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis5新特性Streams作消息队列

Redis5新特性Streams作消息队列

作者头像
ytao
发布2020-06-04 11:12:48
6210
发布2020-06-04 11:12:48
举报
文章被收录于专栏:ytaoytao

前言

Redis 5 新特性中,Streams 数据结构的引入,可以说它是在本次迭代中最大特性。它使本次 5.x 版本迭代中,Redis 作为消息队列使用时,得到更完善,更强大的原生支持,其中尤为明显的是持久化消息队列。同时,stream 借鉴了 kafka 的消费组模型概念和设计,使消费消息处理上更加高效快速。本文就 Streams 数据结构中常用 API 进行分析。

准备

本文所使用 Redis 版本为 5.0.5 。如果使用更早的 5.x 版本,有些 API 使用效果,与本文中描述略有不同。

添加消息

Streams 添加数据使用 XADD 指令进行添加,消息中的数据以 K-V 键值对的形式进行操作。一条消息可以存在多个键值对,添加命令格式:

代码语言:javascript
复制
XADD key ID field string [field string ...]

其中 key 为 Streams 的名称,ID 为消息的唯一标志,不可重复,field string 就为键值对。下面我们就添加以 person 为名称的流,进行操作。

代码语言:javascript
复制
XADD person * name ytao des https://ytao.top

上面添加案例中,ID 使用 * 号复制,这里代表着服务端自动生成 Id,添加后返回数据 "1578238486193-0"

这里自动生成的 Id 格式为 <millisecondsTime>-<sequenceNumber>Id 是由两部分组成:

  1. millisecondsTime 为当前服务器时间毫秒时间戳。
  2. sequenceNumber 当前序列号,取值来源于当前毫秒内,生成消息的顺序,默认从 0 开始加 1 递增。

比如:1578238486193-3 表示在 1578238486193 毫秒的时间戳时,添加的第 4 条消息。

除了服务端自动生成 Id 方式外,也支持指定 Id 的生成,但是指定 Id 有以下条件限制:

  1. Id 中的前后部分必须为数字。
  2. 最小 Id 为 0-1,不能为 0-0,但是 2-0,3-0 .... 是被允许的。
  3. 添加的消息,Id 的前半部分不能比存在 Id 最大的值小,Id 后半部分不能比存在前半部分相同的最大后半部分小。

否则,当不满足上述条件时,添加后会抛出异常:

代码语言:javascript
复制
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item

实际上,当添加一条消息时,会进行两部操作。第一步,先判断如果不存在 Streams,则创建 Streams 的名称,再添加消息到 Streams 中。即使添加消息时,由于 Id 异常,也可以在 Redis 中存在以当前 Streams 的名称。Streams 中 Id 也可作为指针使用,因为它是一个有序的标记。

生产中,如果这样使用添加消息,会存在一个问题,那就是消息数量太大时,会使服务宕机。这里 Streams 的设计初期也有考虑到这个问题,那就是可以指定 Streams 的容量。如果容量操作这个设定的值,就会对调旧的消息。在添加消息时,设置 MAXLEN 参数。

代码语言:javascript
复制
XADD person MAXLEN 5 * name ytao des https://ytao.top

这样就指定该了 Streams 中的容量为 5 条消息。也可使用 XTRIM 截取消息,从小到大剔除多余的消息:

代码语言:javascript
复制
XTRIM person MAXLEN 8

消息数量

查看消息数量使用 XLEN 指令进行操作。

代码语言:javascript
复制
XLEN key

例:查看 person 流中的消息数量:

代码语言:javascript
复制
> XLEN person
(integer) 5

查询消息

查询 Streams 中的消息使用 XRANGEXREVRANGE 指令。

XRANGE

查询数据时,可以按照指定 Id 范围进行查询,XRANGE 查询指令格式:

代码语言:javascript
复制
XRANGE key start end [COUNT count]

参数说明:

  • key 为 Streams 的名称
  • start 为范围查询开始 Id,包含本 Id。
  • start 为范围查询结束 Id,包含本 Id。
  • Count 为查询返回最大的消息数量,非必填。

这里 start 和 end 有 -+两个非指定值,他们分别表示无穷小和无穷大,所以当使用这个两个值时,会查询出全部的消息。

代码语言:javascript
复制
> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"

上面查询的消息数据,可以看到是按照先进先出的顺序查询出来的。

使用 COUNT 指定查询返回的数量:

代码语言:javascript
复制
# 查询所有的消息,并且返回一条数据
> XRANGE person - + COUNT 1
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"

在范围查询中,Id 的后半部分可省略,后半部分中的数据会全部查询到。

XREVRANGE

XREVRANGE 的查询和 XRANGE 指令中的使用类似,但查询的 start 和 end 参数顺序进行了调换:

代码语言:javascript
复制
XREVRANGE key end start [COUNT count]

使用案例:

代码语言:javascript
复制
> XREVRANGE person +  -
1) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"

查询后的结果与 XRANGE 的结果顺序刚好相反,其他都一样,这两个指令可进行消息的升序和降序的返回。

删除消息

删除消息使用 XDEL 指令操作,只需指定将要删除的 Streams 名称和 Id 即可,支持一次删除多个消息 。

代码语言:javascript
复制
XDEL key ID [ID ...]

删除案例:

代码语言:javascript
复制
# 查询所有消息
> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
3) 1) "2-0"
   2) 1) "name"
      2) "gaga"
      3) "des"
      4) "fishion!"
# 删除消息
> XDEL person 2-0
(integer) 1
# 再次查询删除后的所有消息
> XRANGE person - +
1) 1) "0-1"
   2) 1) "name"
      2) "ytao"
      3) "des"
      4) "https://ytao.top"
2) 1) "0-2"
   2) 1) "name"
      2) "luffy"
      3) "des"
      4) "valiant!"
# 查询删除后的长度
> XLEN person
(integer) 2

从上面可以看到,删除消息后,长度也会减少相应的数量。

消费消息

在 Redis 的 PUB/SUB 中,我们是通过订阅来消费消息,在 Streams 数据结构中,同样也能实现同等功能,当没有新的消息时,可进行阻塞等待。不仅支持单独消费,而且还可以支持群组消费。

单独消费

单独消费使用 XREAD 指令。可以看到,下面命令中,STREAMS,key, 以及 ID 为必填项。ID 表示将要读取大于该 ID 的消息。当 ID 值使用 $ 赋予时,表示已存在消息的最大 Id 值。

代码语言:javascript
复制
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

上面的 COUNT 参数用来指定读取的最大数量,与 XRANGE 的用法一样。

代码语言:javascript
复制
> XREAD COUNT 1 STREAMS person 0
1) 1) "person"
   2) 1) 1) "0-1"
         2) 1) "name"
            2) "ytao"
            3) "des"
            4) "https://ytao.top"

> XREAD COUNT 2 STREAMS person 0
1) 1) "person"
   2) 1) 1) "0-1"
         2) 1) "name"
            2) "ytao"
            3) "des"
            4) "https://ytao.top"
      2) 1) "0-2"
         2) 1) "name"
            2) "luffy"
            3) "des"
            4) "valiant!"

XREAD 里面还有个 BLOCK 参数,这个是用来阻塞订阅消息的, BLOCK 携带的参数为阻塞时间,单位为毫秒,如果在这个时间内没有新的消息消费,那么就会释放该阻塞。当这里的时间指定为 0 时,会一直阻塞,直到有新的消息来消费到。

代码语言:javascript
复制
# 窗口 1 开启阻塞,等待新消息的到来
> XREAD BLOCK 0 STREAMS person $

# 另开一个连接窗口 2,添加一条新的消息
> XADD person 2-2 name tao des coder
"2-2"

# 窗口 1,获取到有新的消息来消费,并且带有阻塞的时间
> XREAD BLOCK 0 STREAMS person $
1) 1) "person"
   2) 1) 1) "2-2"
         2) 1) "name"
            2) "tao"
            3) "des"
            4) "coder"
(60.81s)

当使用 XREAD 进行顺序消费时,需要额外记录下读取到位置的 Id,方便下次继续消费。

群组消费

群组消费的主要目的也就是为了分流消息给不同的客户端处理,以更高效的速率处理消息。为达到这一肝功能需求,我们需要做三件事:创建群组群组读取消息向服务端确认消息以处理

群组操作

操作群组使用 XGROUP 指令:

代码语言:javascript
复制
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

上面命令中,包含操作有:

  • CREATE 创建消费组。
  • SETID 修改下一个处理消息的 Id。
  • DESTROY 销毁消费组。
  • DELCONSUMER 删除消费组中指定的消费者。

我们当前需要使用的是创建消费组:

代码语言:javascript
复制
# 以当前存在的最大 Id 作为消费起始
> XGROUP CREATE person group1 $
OK

群组读取消息

群组读取使用 XREADGROUP 指令, COUNTBLOCK的使用类似 XREAD 的操作,只是多了个群组和消费者的指定:

代码语言:javascript
复制
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

由于群组消费和单独消费类似,这里只进行个阻塞分析,这里 Id 也有个特殊值 >,表示还未进行消费的消息:

代码语言:javascript
复制
# 窗口 1,消费群组中,taotao 消费者建立阻塞监听
XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >

# 窗口 2,消费群组中,yangyang 消费者建立阻塞监听
XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >

# 窗口 3,添加消费消息
> XADD person 3-1 name tony des 666
"3-1"

# 窗口 1,读取到新消息,此时 窗口 2 没有任何反应
> XREADGROUP GROUP group1 taotao BLOCK 0 STREAMS person >
1) 1) "person"
   2) 1) 1) "3-1"
         2) 1) "name"
            2) "tony"
            3) "des"
            4) "666"
(77.54s)

# 窗口 3,再次添加消费消息
> XADD person 3-2 name james des abc!
"3-2"

# 窗口 2,读取到新消息,此时 窗口 1 没有任何反应
> XREADGROUP GROUP group1 yangyang BLOCK 0 STREAMS person >
1) 1) "person"
   2) 1) 1) "3-2"
         2) 1) "name"
            2) "james"
            3) "des"
            4) "abc!"
(76.36s)

以上执行流程中,group1 群组中有两个消费者,当添加两条消息后,这两个消费者轮流消费。

消息ACK

消息消费后,为避免再次重复消费,这是需要向服务端发送 ACK,确保消息被消费后的标记。例如下列情况,我们上面我们将最新两条消息已进行了消费,但是当我们再次读取消息时,还是被读到:

代码语言:javascript
复制
>  XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
   2) 1) 1) "3-2"
         2) 1) "name"
            2) "james"
            3) "des"
            4) "abc!"

这时,我们使用 XACK 指令告诉服务器,我们已处理的消息:

代码语言:javascript
复制
XACK key group ID [ID ...]0

让服务器标记 3-2 已处理:

代码语言:javascript
复制
> XACK person group1 3-2
(integer) 1

再次获取群组读取消息:

代码语言:javascript
复制
>  XREADGROUP GROUP group1 yangyang STREAMS person 0
1) 1) "person"
   2) (empty list or set)

队列中没有了可读消息。除了上面以讲解到的 API 外,查看消费群组信息可使用 XINFO 指令查看,本文不做分析。

总结

上面对 Streams 常用 API 进行了分析,我们可以感受到 Redis 在消息队列支持的道路上,也越来越强大。如果使用过它的 PUB/SUB 功能的话,就会感受到 5.x 迭代正是将你的一些痛点进行了优化。

关注【ytao】,更多原创好文

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-01-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 ytao 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 准备
  • 添加消息
  • 消息数量
  • 查询消息
    • XRANGE
      • XREVRANGE
      • 删除消息
      • 消费消息
        • 单独消费
          • 群组消费
            • 群组操作
            • 群组读取消息
            • 消息ACK
        • 总结
        相关产品与服务
        消息队列 CMQ 版
        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档