前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【redis】 属于redis的 “消息队列”:redis stream(浅析)

【redis】 属于redis的 “消息队列”:redis stream(浅析)

作者头像
看、未来
发布2021-12-27 13:16:54
1.2K0
发布2021-12-27 13:16:54
举报
文章被收录于专栏:CSDN搜“看,未来”

文章目录

关于 redis stream

这以前只知道redis有类似于消息队列的发布/订阅,还真不知道它居然悄咪咪的有“消息队列”呀哈。

redis stream 实现了大部分消息队列的功能,如:

代码语言:javascript
复制
消息ID的序列化生成
消息遍历
消息的阻塞和非阻塞读取
消息的分组消费
ACK确认机制

发布/订阅 模式不能算是真正意义上的消息队列,它有一定的实时性,而且没有做持久化。 不过redis stream 和卡夫卡之类的消息队列也没法比,毕竟它是在内存里的,小。

redis stream 使用示例

官网命令文档参考

添加消息

XADD命令可以发送消息到指定 Stream 消息流中(若不存在则创建)。

使用示例:

代码语言:javascript
复制
XADD mystream 1526919030474-55 键 值

XADD Stream名 消息ID message 消息内容

至于官方文档或者其他博客如果提到什么 *,我也不说啥。

在这里插入图片描述
在这里插入图片描述

如果指定的ID参数是*字符,XADD命令将自动生成唯一的ID。自动生成ID时,第一部分是生成ID的Redis实例的Unix时间(以毫秒为单位)。第二部分只是序列号,用于区分在同一毫秒内生成的ID。

ID保证总是递增的,因此条目在流中是完全有序的。为了保证此属性,如果流中的当前top ID的时间大于实例的当前本地时间,则将使用top entry time,并且ID的序列部分将增加。例如,当本地时钟向后跳时,或者在故障切换后,新主机具有不同的绝对时间时,可能会发生这种情况。

在这里插入图片描述
在这里插入图片描述

当用户为XADD指定显式ID时,最小有效ID为0-1,并且用户必须指定一个大于流中当前任何其他ID的ID,否则命令将失败并返回错误。

在这里插入图片描述
在这里插入图片描述

这个事情告诉我们,如果没有什么特殊需求,就用系统自动生成的ID吧。

版本变迁:

代码语言:javascript
复制
>=6.2:增加了NOMKSTREAM选项、MINID微调策略和LIMIT选项。
>=7.0:添加了对<ms>-*显式ID表单的支持。
使用示例
代码语言:javascript
复制
redis>  XADD mystream * name Sara surname OConnor

"1640412909358-0"

redis>  XADD mystream * field1 value1 field2 value2 field3 value3

"1640412909358-1"

redis>  XLEN mystream

(integer) 2

redis>  XRANGE mystream - +

1) 1) "1640412909358-0"
   2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1640412909358-1"
   2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"

redis>
时间复杂度

O(1) when adding a new entry, O(N) when trimming where N being the number of entries evicted.

读取消息

XREAD

XREAD可用于从消息流中读取数据。

在这里插入图片描述
在这里插入图片描述

格式应该看得出来吧。

最后的参数是消息ID,redis会返回大于该ID的消息。 “0-0”是一个特殊的ID,代表最小的消息ID,使用它可以要求redis从头读取消息。

XREAD 也可以阻塞客户端,等待消息流中接收新的消息。为了进行阻塞,将使用block选项以及超时前要阻塞的毫秒数。

在这里插入图片描述
在这里插入图片描述

通常,Redis阻塞命令以秒为单位超时,但是此命令需要毫秒超时,即使服务器的超时分辨率通常接近0.1秒。

通常这个命令这样使用乎好一些:

代码语言:javascript
复制
XREAD BLOCK 1000 STREAMS mystream $

$ 也是一个特殊ID,表示当前最大的消息ID。使用它可以要求redis读取最新的消息。

时间复杂度

Time complexity: For each stream mentioned: O(N) with N being the number of elements being returned, it means that XREAD-ing with a fixed COUNT is O(1). Note that when the BLOCK option is used, XADD will pay O(M) time in order to serve the M clients blocked on the stream getting new data.

XRANGE

范围读取,字面意思。

这里也有特殊字符可以看一下:+、- The - and + special IDs mean respectively the minimum ID possible and the maximum ID possible inside a stream。

在这里插入图片描述
在这里插入图片描述

可以使用count来限制数量:

在这里插入图片描述
在这里插入图片描述

删除消息

XDEL

这个也没啥好说的,直接来个示例即可。

代码语言:javascript
复制
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
   2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
   2) 1) "c"
      2) "3"
XTRIM

使用 XTRIM 对流进行修剪,限制长度。

代码语言:javascript
复制
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
   2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"
127.0.0.1:6379>
 
redis>

现在也不想写太多,毕竟还没用过。官方文档放在开头了,可自行阅读。 等我下个月用过之后就有的写了。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/12/25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 关于 redis stream
  • redis stream 使用示例
    • 添加消息
      • 使用示例
      • 时间复杂度
    • 读取消息
      • XREAD
      • XRANGE
    • 删除消息
      • XDEL
      • XTRIM
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档