前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >玩转redis-延时消息队列

玩转redis-延时消息队列

作者头像
lpxxn
发布2020-04-16 14:48:07
1K0
发布2020-04-16 14:48:07
举报
文章被收录于专栏:技术之路技术之路

上一篇基于redis的list实现了一个简单的消息队列:玩转redis-简单消息队列

源码地址 使用demo

产品经理经常说的一句话,我们不光要有X功能,还要Y功能,这样客户才能更满意。同样的,只有简单消息队列是不够的,还要有延时消息队列才能算是一个完整的消息队列。

看看redis的命令,放眼望去,的有序集合(sorted set)就是一个很好用的命令,完全可以用他做一个延时消息队列

redis有序集合(sorted set)

redis有序集合,每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。

有序集合的成员是唯一的,但分数(score)却可以重复。

简单操作

添加数据

代码语言:javascript
复制
127.0.0.1:6379> ZADD testSet1 5 a
(integer) 1
127.0.0.1:6379> ZADD testSet1 1 b 8 c 7 d
(integer) 3

读取

代码语言:javascript
复制
127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 3
1) "b"
127.0.0.1:6379> ZRANGEBYSCORE testSet1 0 5
1) "b"
2) "a"

也可以把score打出来

代码语言:javascript
复制
127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf 5 WITHSCORES
1) "b"
2) "1"
3) "a"
4) "5"

查出所有的数据

代码语言:javascript
复制
127.0.0.1:6379> ZRANGEBYSCORE testSet1 -inf inf
1) "b"
2) "a"
3) "d"
4) "c"

删除数据

代码语言:javascript
复制
ZREMRANGEBYSCORE testSet1 0 2

延时队列的实现思路

总体的思路很简单,就是每一个valuescore保存的是时间,也就是说,在添加一个元素时他的score是当前时间+延时的时间。轮循获取数据时,查找小于或等于当前时间的数据项,就是具体的延时消息。

还有一个问题,就是ZRANGEBYSCORElistpop不同,pop是取出元素并且会把元素在list中删除。ZRANGEBYSCORE只会取出数据不会把数据从sorted set中删除。解决方法1,利用redis事务,先ZRANGEBYSCORE取出数据,然后再用ZREMRANGEBYSCORE 把数据删除。

具体实现-code

添加延时消息,参数delay就是我们要延时多久:

代码语言:javascript
复制
func (p *Producer) PublishDelayMsg(topicName string, body []byte, delay time.Duration) error {
	if delay <= 0 {
		return errors.New("delay need great than zero")
	}
	tm := time.Now().Add(delay)
	msg := NewMessage("", body)
	msg.DelayTime = tm.Unix()

	sendData, _ := json.Marshal(msg)
	return p.redisCmd.ZAdd(topicName+zsetSuffix, redis.Z{Score: float64(tm.Unix()), Member: string(sendData)}).Err()
}

使用,比如我们想过1秒再处理

代码语言:javascript
复制
producer.PublishDelayMsg(topicName, body, time.Second)

读取消息并处理

这就比较简单了,就是在一个ticker里循环读取小于或等于当前时间的数据:

代码语言:javascript
复制
func (s *consumer) startGetDelayMessage() {
	go func() {
		ticker := time.NewTicker(s.options.RateLimitPeriod)
		defer func() {
			log.Println("stop get delay message.")
			ticker.Stop()
		}()
		topicName := s.topicName + zsetSuffix
		for {
			currentTime := time.Now().Unix()
			select {
			case <-s.ctx.Done():
				log.Printf("context Done msg: %#v \n", s.ctx.Err())
				return
			case <-ticker.C:
				var valuesCmd *redis.ZSliceCmd
				_, err := s.redisCmd.TxPipelined(func(pip redis.Pipeliner) error {
					valuesCmd = pip.ZRangeWithScores(topicName, 0, currentTime)
					pip.ZRemRangeByScore(topicName, "0", strconv.FormatInt(currentTime, 10))
					return nil
				})
				if err != nil {
					log.Printf("zset pip error: %#v \n", err)
					continue
				}
				rev := valuesCmd.Val()
				for _, revBody := range rev {
					msg := &Message{}
					json.Unmarshal([]byte(revBody.Member.(string)), msg)
					if s.handler != nil {
						s.handler.HandleMessage(msg)
					}
				}
			}
		}
	}()
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-04-14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • redis有序集合(sorted set)
    • 简单操作
    • 延时队列的实现思路
      • 具体实现-code
      相关产品与服务
      云数据库 Redis
      腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档