前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Redis stream Java API实践

Redis stream Java API实践

作者头像
FunTester
发布2022-07-08 15:20:25
7740
发布2022-07-08 15:20:25
举报
文章被收录于专栏:FunTester

最近工作中使用到了消息中间件,另外一个组的同事经过评估选择了Redis stream作为最终选择。我自己写的性能测试框架自然也需要接入这套消息系统。所以我也抓紧学习起来。

Redis Stream 是 Redis 5.0 版本新增加的数据结构。Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

之前还没发现Redis还有这种使用方法,着实有点少见过怪了。照例我后面会进行一些基本功能的性能测试,下面分享基本功能的使用演示。

准备工作

依赖

如果想自己操作的话,请注意这个版本,因为在我找资料的过程中发现,不同版本的API有不少的差异,算是踩了一些坑。如果你使用其他版本的redis.clients遇到代码无法运行的时候,可以直接翻看源码查看相关参数类型。

Maven依赖:

代码语言:javascript
复制
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.2.3</version>
</dependency>

Gradle依赖:

代码语言:javascript
复制
// https://mvnrepository.com/artifact/redis.clients/jedis
implementation group: 'redis.clients', name: 'jedis', version: '4.2.3'

Redis server版本:Redis 6.2.5

XADD - 添加消息到末尾

如果key对应的队列不存在,则会自动创建。

首先我们需要创建一个redis.clients.jedis.params.XAddParams,顾名思义就是查询参数,这里面有重要的参数:redis.clients.jedis.params.XAddParams#maxLen表示设置队列的长度,但是不常用。语法如下:

def len = XAddParams.xAddParams()

xadd API使用方式如下:

代码语言:javascript
复制
    public static void main(String[] args) {
        def base = new RedisBase("127.0.0.1", 6379)
        Jedis jedis = base.getJedis()
        def len = XAddParams.xAddParams()
        def map = new HashMap<Integer, String>()
        map.put("FunTester", Time.getDate() + TAB + 325)
            jedis.xadd("fun", len, map)
        jedis.close()

    }

XTRIM - 对流进行修剪,限制长度

这个API就是设置队列长度。使用方式也非常简单。

代码语言:javascript
复制
    public static void main(String[] args) {
        def base = new RedisBase("127.0.0.1", 6379)
        Jedis jedis = base.getJedis()
        def xtrim = jedis.xtrim("fun", XTrimParams.xTrimParams().maxLen(10))
        output(xtrim)
        jedis.close()

    }

返回值是丢弃的消息的数量。

XDEL - 删除消息

这个就是删除某个消息,使用更简单了。

代码语言:javascript
复制
    public static void main(String[] args) {
        def base = new RedisBase("127.0.0.1", 6379)
        Jedis jedis = base.getJedis()
        jedis.xdel("fun",new StreamEntryID(1653129389004,1))
        jedis.close()

    }

XLEN - 获取流包含的元素数量,即消息长度

话不多说了,使用如下:

代码语言:javascript
复制
jedis.xlen("fun")

XREAD - 以阻塞或非阻塞方式获取消息列表

这个要着重介绍一下,因为我用的就是这个,首先我们需要创建一个redis.clients.jedis.params.XReadParams,这里有两个参数:redis.clients.jedis.params.XReadParams#countredis.clients.jedis.params.XReadParams#block。前者控制返回数量,后者控制阻塞时间,如果时间小于0则认为不阻塞,等于0则一直会阻塞,小于0会报错。不设置该参数责任无非阻塞模式。PS:数量不足不会造成阻塞。示例如下:

代码语言:javascript
复制
        def block = XReadParams.xReadParams().count(3).block(1000)

还有我们需要redis.clients.jedis.Jedis#xread(redis.clients.jedis.params.XReadParams, java.util.Map<java.lang.String,redis.clients.jedis.StreamEntryID>)第二个参数,这里常用的两种:

代码语言:javascript
复制
        Map<String, StreamEntryID> entry = ["fun": new StreamEntryID()]//获取历史消息
        Map<String, StreamEntryID> entry = ["fun": StreamEntryID.LAST_ENTRY]//获取在请求之后添加的消息

遍历消息:

代码语言:javascript
复制
        List<Map.Entry<String, List<StreamEntry>>> xread = jedis.xread(block, entry)
        output(xread.size())
        Map.Entry<String, List<StreamEntry>> get = xread.get(0)
        def value = get.getValue()
        value.each {
            println(it.getID())
            println(it.getFields().get("FunTester"))
        }

控制台响应如下:

代码语言:javascript
复制
16:40:56.065 main redis连接池IP:127.0.0.1,端口:6379,超时设置:5000
16:40:56.280 main 1
1653725282325-0
2022-05-28 16:08:02 325
1653725282325-1
2022-05-28 16:08:02 325
1653725282325-2
2022-05-28 16:08:02 325

XRANGE - 获取消息列表,会自动过滤已经删除的消息

这个API获取某个范围内的消息,有个startend的参数,可以传String类型的消息ID,也可以传redis.clients.jedis.StreamEntryID,方法重载的比较多,有兴趣可以翻一翻源码。

代码语言:javascript
复制
jedis.xrange("fun", "1653129389045-0", "1653129389047-0")

后面会对Redis stream API进行性能测试,欢迎继续关注FunTester。

Have Fun ~ Tester !

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-06-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 FunTester 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 准备工作
    • 依赖
    • XADD - 添加消息到末尾
    • XTRIM - 对流进行修剪,限制长度
    • XDEL - 删除消息
    • XLEN - 获取流包含的元素数量,即消息长度
    • XREAD - 以阻塞或非阻塞方式获取消息列表
    • XRANGE - 获取消息列表,会自动过滤已经删除的消息
    • Have Fun ~ Tester !
    相关产品与服务
    云数据库 Redis
    腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档