Redis Stream 实践

1. 前言

redis 5 中有一个重大新特性:stream。

stream 是一个日志形式的存储结构,可以往里追加数据,每条数据都会生成一个时间戳ID,stream 也有便捷的读取数据的模型。

stream 的特性使其适合做消息队列和时间序列存储。

下面通过实践来深入了解stream,由于内容较长,我也准备了PDF版本,文章底部有下载地址。

2. 安装环境

需要使用最新的5.0版本,这里使用 docker redis 容器:

docker run --name redis5 -p 6379:6379 -d redis:5.0-rc3

redis 客户端:

docker run -it --link redis5:redis --rm redis redis-cli -h redis -p 6379

启动后进入交互命令行:

redis:6379>

3. 实践

3.1 向stream添加元素

stream 元素可以是一个或多个键值对,添加:

redis:6379> XADD mystream * sensor-id 1234 temperature 19.8
1531989605376-0

解析:

  • mystream 是 stream的key
  • * 所在位置的参数的含义是元素ID,* 表示由系统自动生成一个元素ID
  • 添加的元素包含2个键值对,sensor-id 1234 和 temperature 19.8
  • 返回值是新增元素的ID,由时间戳和递增数字构成

获取Stream中元素的数量:

redis:6379> XLEN mystream
(integer) 1

3.2 范围查询

需要指定起止ID,相当于给一个时间范围:

redis:6379> XRANGE mystream 1531989605376 1531989605377
1) 1) 1531989605376-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

可以使用 - 代表最小ID, + 代表最大ID:

redis:6379> XRANGE mystream - +
1) 1) 1531989605376-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

当返回元素太多时,可以限定返回结果数量,就像数据库查询时的分页,通过 COUNT 参数指定:

redis:6379> XRANGE mystream - + COUNT 2
1) 1) 1531989605376-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"

还可以反向查询,使用 XREVRANGE 命令即可,用法与 XRANGE 相同。

3.3 监听 stream 的新元素

redis:6379> XREAD COUNT 2 STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1531989605376-0
         2) 1) "sensor-id"
            2) "1234"
            3) "temperature"
            4) "19.8"

STREAMS 后面的 mystream 指定的是目标 stream 的 key, 0 是指最小的ID,就是获取指定stream中的大于指定ID的元素, COUNT 指获取的数量

可以一起指定多个stream,例如 STREAMS mystream otherstream00

阻塞监听

在客户端1中执行:

redis:6379> XREAD BLOCK 0 STREAMS mystream $

会进入等待状态。

在客户端2中添加元素:

redis:6379> XADD mystream * test 1

客户端1中会显示刚刚添加的元素:

1) 1) "mystream"
   2) 1) 1) 1531994510562-0
         2) 1) "test"
            2) "1"

BLOCK 表示阻塞, 0 是指定超时时间,0 表示永不超时, $ 表示stream中的最大ID。

3.4 消费者组

当stream量很大,或者消费者处理过程比较耗时的时候,只有一个消费者的话压力就比较大了,redis stream 提供了消费者组的概念,可以让多个消费者处理同一个stream,可以实现负债均衡。

例如有3个消费者 C1、C2、C3,stream 中有7个消息元素,那么消费的分配就是:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

3.4.1 创建消费者组

redis:6379> XGROUP CREATE mystream mygroup01 $
OK

针对 mystream 这个 stream 创建了一个消费者组,名字为 mygroup01$ 表示读取目前最大ID之后的元素。

3.4.2 添加测试数据

添加几条新数据:

redis:6379> XADD mystream * message apple
1531999977149-0
redis:6379> XADD mystream * message orange
1531999980272-0
redis:6379> XADD mystream * message strawberry
1531999983493-0
redis:6379> XADD mystream * message apricot
1531999988458-0
redis:6379> XADD mystream * message banana
1531999991782-0

3.4.3 通过消费者组读数据

redis:6379> XREADGROUP GROUP mygroup01 Alice COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1531999977149-0
         2) 1) "message"
            2) "apple"

Alice 是组成员的名字,> 的含义是:到目前为止没有被组内成员读取过的数据。

可以看到,组成员不需要提前创建,第一次使用时自动创建。

下面再创建1个成员来读取数据:

redis:6379> XREADGROUP GROUP mygroup01 Bob COUNT 1 STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) 1531999980272-0
         2) 1) "message"
            2) "orange"

3.4.4 消费历史

redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1531999977149-0
         2) 1) "message"
            2) "apple"

这里最后指定的ID是 0,这样可以拿到悬而未决的历史数据,就是:自己曾经消费过,但没有发送消费确认的历史数据,这样可以让我们做故障恢复后的完善工作。

3.4.5 消费确认

redis:6379> XACK mystream mygroup01 1531999977149-0
(integer) 1

1531999977149-0Alice 消费的那条 apple 数据,再查看下 Alice 的消费历史:

redis:6379> XREADGROUP GROUP mygroup01 Alice STREAMS mystream 0
1) 1) "mystream"
   2) (empty list or set)

已经空了。

3.4.6 失败处理

通过上面可以了解到,当某个消费者出现问题然后恢复了之后,可以拿到自己还没有确认过的消息数据,这个一个安全保障机制,但如果这个出问题的消费者再也恢复不了了怎么办?他的那些还没确认过的消息数据是不是就没办法处理了?

redis stream 提供了这种情况的处理办法,通过2个步骤来解决:

  1. 查出所有已传递但未确认的消息数据
  2. 变更这些数据的所有者

这样就可以让新的消费者来处理这些数据了。

列出未处理的数据:

redis:6379> XPENDING mystream mygroup01 - + 10
1) 1) 1531999980272-0
   2) "Bob"
   3) (integer) 45126376
   4) (integer) 2
2) 1) 1531999983493-0
   2) "Tom"
   3) (integer) 867475
   4) (integer) 1

可以看到有2条数据未处理,列出了每条数据的 ID、所有者、此条消息的闲置时间(毫秒)、此消息被传递的次数。

声明变更所有者:

redis:6379> XCLAIM mystream mygroup01 Gates 3600 1531999980272-0 1531999983493-0
1) 1) 1531999980272-0
   2) 1) "message"
      2) "orange"
2) 1) 1531999983493-0
   2) 1) "message"
      2) "strawberry"

把指定2个ID的消息给了Gates3600 是指最小闲置时间,就是把指定消息中闲置时间大于3600的分配给Gates,注意Gates是全新的消费者,之前没有声明过,说明分配给新的消费者也是可以的。

查询一下Gates现在未处理的数据:

redis:6379> XREADGROUP GROUP mygroup01 Gates STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) 1531999980272-0
         2) 1) "message"
            2) "orange"
      2) 1) 1531999983493-0
         2) 1) "message"
            2) "strawberry"

可以看到新分配的2条数据。

3.5 查看 stream 相关信息

基本信息:

redis:6379> XINFO STREAM mystream
 1) length
 2) (integer) 15
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2
 9) first-entry
10) 1) 1531989605376-0
    2) 1) "sensor-id"
       2) "1234"
       3) "temperature"
       4) "19.8"
11) last-entry
12) 1) 1531999991782-0
    2) 1) "message"
       2) "banana"

消费组信息:

redis:6379> XINFO GROUPS mystream
1) 1) name
   2) "mygroup"
   3) consumers
   4) (integer) 3
   5) pending
   6) (integer) 5
2) 1) name
   2) "mygroup01"
   3) consumers
   4) (integer) 4
   5) pending
   6) (integer) 2

某个组中消费者的信息:

redis:6379> XINFO CONSUMERS mystream mygroup
1) 1) name
   2) "Alice"
   3) pending
   4) (integer) 3
   5) idle
   6) (integer) 2483388
2) 1) name
   2) "Bob"
   3) pending
   4) (integer) 2
   5) idle
   6) (integer) 48453755
3) 1) name
   2) "Gates"
   3) pending
   4) (integer) 0
   5) idle
   6) (integer) 2385114

3.7 删除消息数据

先查一下现有数据:

redis:6379> XRANGE mystream - + COUNT 2
1) 1) 1531989605376-0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "19.8"
2) 1) 1531994510562-0
   2) 1) "test"
      2) "1"

删除第一条数据:

redis:6379> XDEL mystream 1531989605376-0
(integer) 1

再次查看,之前的第一条数据已经没有了:

redis:6379> XRANGE mystream - + COUNT 2
1) 1) 1531994510562-0
   2) 1) "test"
      2) "1"
2) 1) 1531994516257-0
   2) 1) "test"
      2) "2"

注意:XDEL 并不是真正的从内存中删除,只是做了标识,不会回收内存

3.8 设置stream最大长度

添加数据,同时指定了最大长度为2:

redis:6379> XADD mystream MAXLEN 2 * value 1
1532049865028-0
redis:6379> XADD mystream MAXLEN 2 * value 2
1532049872075-0
redis:6379> XADD mystream MAXLEN 2 * value 3
1532049877554-0

上面添加了3条数据,下面看一下stream 的长度和现在的内容:

redis:6379> XLEN mystream
(integer) 2

redis:6379> XRANGE mystream - +
1) 1) 1532049872075-0
   2) 1) "value"
      2) "2"
2) 1) 1532049877554-0
   2) 1) "value"
      2) "3"

可以看到只有2条数据。

4. 小结

以上就是 redis stream 的基础操作,实践一遍之后就会对 stream 有个全面的了解。

PDF 下载地址:

https://pan.baidu.com/s/1j91evQWfJHFxMXftTxSi6A

原文发布于微信公众号 - 性能与架构(yogoup)

原文发表时间:2018-07-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏佳爷的后花媛

php基础(二)

输出b,if中的空值赋值给$num,因此if条件必定为false,还有其他一些大同小异的题目,用=和==判断for循环的,只要把握好基本的概念就行了

2532
来自专栏阿杜的世界

Redis学习札记

Redis支持两种持久化方式,一种是RDB方式(快照:根据指定的规则“定时”将内存中的数据存储在硬盘上),另一种是AOF方式(在每次执行命令后都将命令本身记录下...

1083
来自专栏信安之路

PHP 代码审计之死磕 SQL 注入

代码审计中对 SQL 注入的审计是很常见的,那么要怎样才能审计出一个 SQL 注入呢?

1560
来自专栏SDNLAB

SDNLAB技术分享(三):OpenDaylight中编程抽象的实现

这次主要分三部分说一下,首先我会粗略介绍一下maple system。 之后将以这个为例来阐述一下ODL模块的开发过程。 最后会说明一下ODL模块的结构。 目前...

35710
来自专栏程序员的知识天地

使用 JS 实现一个本地数据库

前端很多时候还是需要保存一些数据的,这里的保存指的是长久的保存。以前的思想是把数据保存在 Cookie 中,或者将 key 保存在 Cookie 中,将其他数据...

2952
来自专栏JetpropelledSnake

Django学习笔记之Queryset的高效使用

对象关系映射 (ORM) 使得与SQL数据库交互更为简单,不过也被认为效率不高,比原始的SQL要慢。

1253
来自专栏腾讯Bugly的专栏

《Android 创建线程源码与OOM分析》

| 导语 企鹅FM近几个版本的外网Crash出现很多OutOfMemory(以下简称OOM)问题,Crash的堆栈都在Thread::start方法上。该文详细...

9215
来自专栏Java Web

Java I/O不迷茫,一文为你导航!

学习过计算机相关课程的童鞋应该都知道,I/O 即输入Input/ 输出Output的缩写,最容易让人联想到的就是屏幕这样的输出设备以及键盘鼠标这一类的输入设备,...

1052
来自专栏JetpropelledSnake

Python面试题之Python面试题汇总

(1)与java相比:在很多方面,Python比Java要简单,比如java中所有变量必须声明才能使用,而Python不需要声明,用少量的代码构建出很多功能;...

4.1K3
来自专栏有趣的django

35.Django2.0文档

第四章 模板  1.标签 (1)if/else {% if %} 标签检查(evaluate)一个变量,如果这个变量为真(即,变量存在,非空,不是布尔值假),系...

56710

扫码关注云+社区