stream是redis最复杂的一个数据结构, 也是redis5.0的一个重要更新. 有很多值得学习的点. 这里会做个小系列, 从基础使用到源码解析.
stream实际上是一个消息发布订阅功能组件, 也就是消息队列. 这样的数据结构其实很常见, 比如腾讯云的cmq. 当然还有kafka等.
image
除了更好的性能, 它最大的特性是可以获取历史消息, 可以持久化消息. 它主要由消息、生产者、消费者、消费组4部分组成. 这里消费组可能让人有些困惑, 其实就是消费组里面有多个消费者, 他们互相竞争.
xadd用来创建, 每个stream有一个唯一key, *意味着让系统给你返回id, id是由unix时间和从0开始下标组成, 也就是这一毫秒的第几个条目. 你可以自己设定, 但是要确保严格单调递增. 后面就是键值对, 也就是消息本身.
xadd mystream * str1 hello str2 world
你可以用xlen查看信息数, 也可以用xinfo stream查看stream信息.
image
通过xdel可以删除消息, 但是注意, 其实没有删除, 只是设置了标志位.
image
xrange可以范围遍历, -代表最前, +代表最后, 你可以将其换成某个id, 因为是严格递增的, 所以, 遍历结果是一样的.
image
del mystream
前面说到了, 有消费者的概念, 那么它就可以消费消息队列的任何消息. 每读一次, 迭代器都会前进一次. 或者你可以阻塞读, 这里block后面跟毫秒, 如果是0就是一直阻塞.
image
首先可以建组, 这里最后是id, 0就代表从最前面开始获取消息, 可以写成$, 意味着或许新消息. 然后起一个消费者的名字, 比如这里的alice, > 意味着, 只把没有分发给别人的消息发给我,这也是最常用的方式. 当然你可以起名别的, 来获取之后的消息, 但是注意消费者是竞争关系, 迭代器会一直向前.
image