专栏首页idba怎么用 Redis 快速实现一个延迟队列?

怎么用 Redis 快速实现一个延迟队列?

在后端服务中,经常有这样一种场景,写数据库操作在异步队列中执行,且这个异步队列是多进程运行的,这时如果对同一资源进行写库操作,很有可能产生数据被覆盖等问题,于是就需要业务层在更新数据库之前进行加锁,这样保证在更改同一资源时,没有其他更新操作干涉,保证数据一致性。

但如果在更新前对数据库更新加锁,那此时又来了新的更新数据库的请求,但这个更新操作不能丢弃掉,需要延迟执行,那这就需要添加到延迟队列中,延迟执行。

那么如何实现一个延迟队列?

利用RedisSortedSetString这两种结构,就可以轻松实现。

具体实现

# coding: utf8

"""Delay Queue"""

import json
import time
import uuid

import redis


class DelayQueue(object):

    """延迟队列"""

    QUEUE_KEY = 'delay_queue'
    DATA_PREFIX = 'queue_data'

    def __init__(self, conf):
        host, port, db = conf['host'], conf['port'], conf['db']
        self.client = redis.Redis(host=host, port=port, db=db)

    def push(self, data):
        """push

        :param data: data
        """
        # 唯一ID
        task_id = str(uuid.uuid4())
        data_key = '{}_{}'.format(self.DATA_PREFIX, task_id)
        # save string
        self.client.set(data_key, json.dumps(data))
        # add zset(queue_key=>data_key,ts)
        self.client.zadd(self.QUEUE_KEY, data_key, int(time.time()))
        
    def pop(self, num=5, previous=3):
        """pop多条数据

        :param num: pop多少个
        :param previous: 获取多少秒前push的数据
        """
        # 取出previous秒之前push的数据
        until_ts = int(time.time()) - previous
        task_ids = self.client.zrangebyscore(
            self.QUEUE_KEY, 0, until_ts, start=0, num=num)
        if not task_ids:
            return []

        # 利用删除的原子性,防止并发获取重复数据
        pipe = self.client.pipeline()
        for task_id in task_ids:
            pipe.zrem(self.QUEUE_KEY, task_id)
        data_keys = [
            data_key
            for data_key, flag in zip(task_ids, pipe.execute())
            if flag
        ]
        if not data_keys:
            return []
        # load data
        data = [
            json.loads(item)
            for item in self.client.mget(data_keys)
        ]
        # delete string key
        self.client.delete(*data_keys)
        return data

实现思路

push

push数据时,执行如下几步:

  • 生成一个唯一key,这里使用uuid4生成(uuid4是根据随机数生成的,重复概率非常小)
  • 把数据序列化后存入这个唯一keyString结构中
  • 把这个唯一key加到SortedSet中,score是当前时间戳

这里利用SortedSet记录添加数据的时间,便于在获取时根据时间获取之前的数据,达到延迟的效果。 而真正的数据则存放在String结构中,等获取时先拿到数据的key再获取真正的数据。

这里可能有人会疑问,为什么不把真正的数据放到SortedSetname中?

  • 把数据放入name中可能会产生瞬间写入相同数据导致数据多条变一条的情况
  • 把数据序列化放到SortedSetname中有些过大,不太符合使用习惯

pop

pop是可以获取多条数据的,上面的代码默认是获取延迟队列中3秒前的5条数据,具体思路如下:

  • 计算previous秒前的时间戳,使用SortedSetzrangebysocre方法获取previous秒之前添加的唯一key
  • 如果SortedSet中有数据,则利用Redis删除的原子性,使用zrem依次删除SortedSet的元素,如果删除成功,则使用,防止多进程并发执行此方法,拿到相同的数据
  • 那到可用的唯一key,从String中获取真正的数据即可

这里最重要的是第二步,在拿出SortedSet的数据后,一定要防止其他进程并发获取到相同的数据,所以在这里使用zrem依次删除元素,保证只有删除成功的进程才能使用这条数据。

使用

# coding: utf8

import time

from delay import DelayQueue

redis_conf = {'host': '127.0.0.1', 'port': 6379, 'db': 0}

# 构造延迟队列对象
queue = DelayQueue(redis_conf)

# push 20条数据
for i in range(20):
    item = {'user': 'user-{}'.format(i)}
    queue.push(item)
    
# 从延迟队列中马上获取10条数据
data = queue.pop(num=10)
# 刚添加的马上获取是获取不到的
assert len(data) == 0

# 休眠10秒
time.sleep(10)
# 从延迟队列中获取10条数据
data = queue.pop(num=10)
assert len(data) == 10

# 从延迟队列中获取截止到5秒之前添加的10条数据
data = queue.pop(num=10, previous=5)
assert len(data) == 10

使用就比较简单了,在实际使用过程中,每次在处理正常队列时,通过上面的方法获取一下延迟队列的数据,如果延迟队列中有数据,那么按照业务正常处理就可以了,这样就达到了数据延迟处理的效果。

来源 | http://kaito-kidd.com/2016/12/26/delay-queue-based-on-redis/

-The End-

本文分享自微信公众号 - yangyidba(yangyidba),作者:Kaito

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-10-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 使用innobackup 2.4遇到的问题

    一 前言 Percona公司发布 innobackup 2.4 版本已经很久了,增加了新的特性比如支持非Innodb表备份,指定 --safe-slave-...

    用户1278550
  • 笔记|见识

    最近阅读了 吴军的《见识》这本书,收获很多,感触最深的是 "伪工作者"的概念。本文记录自己的读书笔记,分享给大家。

    用户1278550
  • xfs vs ext4 性能压测对比

    最近忙着给YOUZAN的数据库服务器升级系统版本,从centos6 升级到centos7。centos/redhat 7 默认将文件系统设置为xfs。咨询了很多...

    用户1278550
  • [Python3 开发技巧]·如何打乱字典中多个对应数组

    当我们把数个对应数组保存到字典中,在我们读取的时候这些数据会按照我们保存的顺序读取出来。如果我们需要打乱顺序,但不改变对应数组的关系时,例如原先位置0对应的各个...

    小宋是呢
  • 研报复制(三):基于相对强弱指标的大小盘轮动

    A股市场上存在着明显的大小盘轮动的现象,一段时间大盘表现强势,一段时间小盘表现强势,所谓二八轮动。这种现象提供了构建大小盘轮动策略的可能,目前常见的两种构建大小...

    量化小白
  • 漫谈可视化Prefuse(六)---改动源码定制边粗细

    可视化一路走来,体会很多;博客一路写来,收获颇丰;代码一路码来,思路越来越清晰。终究还是明白了一句古话:纸上得来终觉浅,绝知此事要躬行。   跌跌撞撞整合了个...

    JackieZheng
  • data自定义属性在jQuery中的用法

    (1)如果在HTML文档中设置的data-自定义属性的单个字符串的名称的属性中若有大写值,在js文件中获取时只能用小写的形式获取。如:

    kirin
  • 寻找和为定值的两个数

    输入一个整数数组和一个整数,在数组中查找一对数,满足他们的和正好是输入的那个整数,如果有多对数的和等于输入的整数,则全部输出,要求输出的结果中不应该出现重复,如...

    陌无崖
  • 为什么vue中的data必须是一个函数?

    object是引用类型,如果不用function返回,每个组件的data都是内存的同一个地址,一个数据改变了其他也改变了。

    用户3258338
  • datatables,表格

    windseek

扫码关注云+社区

领取腾讯云代金券