当数据库更新后,最好相关缓存也需要更新。一种简单的办法是删除相应的redis key。但是如果redis_key中包含变量呢?
比如查询倒数N的任务数据,它的key是这样的:
task:{uid}:{count} # task:1:2, 用户1的倒数第2条任务
之前我使用 keys task:1:*
来找到相关的key的缓存,然后删除之。
但是keys 调用会阻塞进程。即使使用SCAN,也很Ugly。所以设计一个简单的缓存更新策略就很必要了。
以上面的例子为示例:
ut:{uid}
作为user task的版本标识(一般来说,划分以数据库的表为准)。cache:{ 'data': xxx, 'version': timestamp }
ut:1
的值。显然,ut:1
的值代表数据库的更新时间。对比cache['version']
和 ut:1
。如果cache['version']
小于 ut:1
,说明缓存落后于数据库。此时,刷新缓存。并更新cache['version']
为最新timestamp。
完成上面操作后,实作中,把所有和user task
表相关的缓存都绑定到ut:{uid}
这个key上。只要在更新表时更新这个key。就能引发所有的缓存惰性更新。
def redis_cached_auto_refresh(prefix, timeout=30, key_args=[], version_key='', version_args=[]):
"""
cache一个页面。prefix是 key。key_args决定key中动态附加的入参
version_key是一个公共的版本标识。当这个版本标识变化,将引发关联的所有缓存更新,
所以确保version_key和约定的一致。
"""
def decorator(f):
@wraps(f)
async def run(request, *args, **kwargs):
key = prefix
# 绑定的过期策略键
v_key = version_key
if args:
for item in key_args:
key += ":{}".format(args[item])
for item in version_args:
v_key += ":{}".format(args[item])
logging.debug("call cache key = {}, bind_key {} ".format(key, v_key))
async with request.app.redis_pool.get() as redis:
try:
pipe = redis.pipeline()
pipe.get(key) # response
pipe.get(v_key) # bind_key_version
ret = await pipe.execute()
response = ret[0]
bind_key_version = ret[1]
# 取公共key(时间戳)作为比较版本
if bind_key_version:
bind_key_version = int(bind_key_version.decode())
else:
bind_key_version = 0
if response:
response = pickle.loads(response)
this_time_version = response['version']
# 当缓存的数据新于公共标识key的版本,返回数据。
# 当缓存的数据旧于公共标识key的版本,重新调用函数取数据
if this_time_version >= bind_key_version:
logging.debug("redis_cached hit cache, key:{}".format(key))
return response['data']
except Exception as e:
logging.error("redis_cached cache error, key:{} {}".format(key, e))
nonlocal timeout
response = await f(request, *args, **kwargs)
if response:
response_cached = {
'data': response,
'version': int(time.time())
}
response_cached = pickle.dumps(response_cached, protocol=pickle.HIGHEST_PROTOCOL)
logging.debug("redis_cached will set cache, key:{} timeout:{}"
.format(key, timeout))
if options.dev is True:
logging.info("in debug mode, timeout = 60s")
timeout = 60
await redis.setex(key, timeout, response_cached)
return response
return run
return decorator