多姿势扩展 Redis 命令

一、业务场景

空间宠物业务需要实现一个定时消息触发组件,如在特定时刻给用户推送收集糖果通知、biubiu球功能定时回收用户丢弃的球等。可见,消息只有在特定时间到达才能被处理。同时,消息的产生是无序的,即后产生的消息被处理的时间可能早于先产生的消息。

二、为何选择Redis

一些著名的消息队列组件,如ActiveMQ ,本身支持消息延迟投递,为何本文选择Redis呢?一方面是引入新组建有学习、运维、接入成本,而组内已积累一定Redis开发运维经验;另一方面则是基于Redis实现这样一个组件难度也不大。所以决定采用Redis。

三、原生能力探究

键空间通知

键空间通知可以在消息到达时插入一个key,并给key设置过期时间,键过期后会通过特定频道发布键过期通知,订阅方可收到通知并处理事件。但问题在于:

  • key过期并不保证立即删除,Redis只会每次执行server.c:databasesCron时随机删除若干key,大量key同时过期无法保证时效;
  • Pub/Sub机制不保证通知送达,若client掉线则通知丢失;
  • 若多个client同时订阅,则都会收到通知,导致重复处理。

基于原生ZSET

ZSET可在消息插入时根据score排序,从而使最早的消息排在最前面。但ZSET没有提供POP方法,取得第一个元素和删除需要执行两个命令。为保证原子性,可以采用事务,如:

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> ZRANGE myzset 0 0 WITHSCORES
QUEUED
127.0.0.1:6379> ZREMRANGEBYRANK myzset 0 0
QUEUED
127.0.0.1:6379> EXEC
1) 1) "b"
   2) "2"
2) (integer) 1

或者使用pipelining,如:

$ (printf "ZRANGE myzset 0 0 WITHSCORES\r\nZREMRANGEBYRANK myzset 0 0\r\n"; sleep 1) | nc localhost 6379
*2
$1
c
$1
3
:1

但问题在于,虽然可顺序取出消息,但无法只在时间到达后取出消息。因此需要client端实现逻辑等待时间到达再推送。同时,消息产生是无序的,如果取得了一个10分钟后处理的消息,在此期间又产生了一个需要在5分钟后处理的消息,逻辑将变得复杂。

由于使用原生Redis无法满足需求,我们决定扩展Redis命令。

四、多姿势命令扩展

官方势

LUA脚本是利用3.X版官方特性实现命令扩展的途径。以下脚本将读出首元素,并与当前时间戳(以参数传入)比较,如果消息处理时间到达则删除消息并返回;所有操作将是原子的。目前我们线上服务使用该方案。

LUA脚本:

local rs = redis.call('ZRANGE', KEYS[1], '0', '0', 'WITHSCORES');
if table.getn(rs)<2 then return rs end;
if tonumber(rs[2]) < tonumber(ARGV[1]) then
    redis.call('ZREMRANGEBYRANK', KEYS[1], 0, 0);
    return rs
end;
return {}

client生成命令:

redisFormatCommand(&pCmd, "eval %s 1 %s %lld", szScript, szKey, (int64_t)time(NULL)));

缺点是:

  • 使用lua脚本有额外学习成本
  • 实现在客户端,无法很好的复用
  • 使用lua后要做好运维工作,配置脚本超时,注意脚本缓存内存占用

暴力势

改源码,加一个命令。我们较早上线的一个服务使用了该方案。

/* 需要在server.c中加入实现的命令:
struct redisCommand redisCommandTable[] = {
    //......
    {"zlpopif",zlpopifCommand,3,"w",0,NULL,1,1,1,0,0},
    {"zrpopif",zrpopifCommand,3,"w",0,NULL,1,1,1,0,0},
};
*/

/* 实现在t_zset.c: */
void zpopGenericCommand(client *c, int reverse, int condition) {
	robj *key = c->argv[1];
	robj *zobj;
	int keyremoved = 0;
	unsigned long deleted = 0;
	long start = 0, end = 0, llen = 0;

	/* for deletion */
	unsigned char *zleptr, *zlsptr;

	/* for addReply */
	unsigned char zlvstr[128];
	unsigned int zlvlen = 0;
	long long zlvlong = 0;
	robj *slele;
	double node_score;

	/* Step 1: Lookup & range sanity checks if needed. */
	if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
		checkType(c,zobj,OBJ_ZSET)) return;

	llen = zsetLength(zobj);
	if (end >= llen) end = llen-1;

	if (start > end || start >= llen) {
		return;
	}

	/* Step 2: Get value of the node will be remove */
	if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		unsigned char *zl = zobj->ptr;
		unsigned char *vstr;

		if (reverse)
			zleptr = ziplistIndex(zl,-2-(2*start));
		else
			zleptr = ziplistIndex(zl,2*start);

		serverAssertWithInfo(c,zobj,zleptr != NULL);
		zlsptr = ziplistNext(zl,zleptr);

		serverAssertWithInfo(c,zobj,zleptr != NULL && zlsptr != NULL);
		serverAssertWithInfo(c,zobj,ziplistGet(zleptr,&vstr,&zlvlen,&zlvlong));

		/* copy the result, sice the node will be delete before addReply */
		node_score = zzlGetScore(zlsptr);
		if (vstr)
			strncpy((char *)zlvstr, (char *)vstr, zlvlen);
	} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
		zset *zs = zobj->ptr;
		zskiplist *zsl = zs->zsl;
		zskiplistNode *ln;

		/* Check if starting point is trivial, before doing log(N) lookup. */
		if (reverse) {
			ln = zsl->tail;
		} else {
			ln = zsl->header->level[0].forward;
		}

		serverAssertWithInfo(c,zobj,ln != NULL);
		slele = ln->obj;
		incrRefCount(slele); /* MUST call decrRefCount to free mem */
		node_score = ln->score;
	} else {
		serverPanic("Unknown sorted set encoding");
	}

	/* Step 3: Check if condition satisfied. */
	if (condition) {
		double condscore = 0;
		if (getDoubleFromObjectOrReply(c,c->argv[2],&condscore,NULL)
			!= C_OK) goto cleanup;

		if (!reverse && condscore < node_score) {
			addReply(c,shared.emptymultibulk);
			goto cleanup;
		}
		if (reverse && condscore > node_score) {
			addReply(c,shared.emptymultibulk);
			goto cleanup;
		}
	}

	/* Step 4: Perform the deletion operation. */
	if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		/* delete by ptr */
		serverAssertWithInfo(c,zobj,zleptr != NULL);
        zobj->ptr = zzlDelete(zobj->ptr,zleptr);
        deleted = 1;
        /* delete by range */
		/*zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);*/

		if (zzlLength(zobj->ptr) == 0) {
			dbDelete(c->db,key);
			keyremoved = 1;
		}
	} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
		zset *zs = zobj->ptr;

		/* delete by ptr */
		/*serverAssertWithInfo(c,zobj,slele != NULL);
		serverAssertWithInfo(c,zobj,zslDelete(zs->zsl,node_score,slele));
		dictDelete(zs->dict,slele);
		deleted = 1;*/
		/* delete by range */
		deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);

		if (htNeedsResize(zs->dict)) dictResize(zs->dict);
		if (dictSize(zs->dict) == 0) {
			dbDelete(c->db,key);
			keyremoved = 1;
		}
	} else {
		serverPanic("Unknown sorted set encoding");
	}

	/* Step 5: Notifications and reply. */
	if (deleted) {
		signalModifiedKey(c->db,key);
		notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); /* we reuse the built in "zrem" keyspace event for pop operation! */
		if (keyremoved)
			notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
		server.dirty += deleted;
	}

	/* Step 6: Return the result in form of a multi-bulk reply */
	if (deleted) {
		addReplyMultiBulkLen(c, 2); /* at most one element with score */
		if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
			if (zlvlen == 0)
				addReplyBulkLongLong(c,zlvlong);
			else
				addReplyBulkCBuffer(c,zlvstr,zlvlen);
			addReplyDouble(c,node_score);
		} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
			addReplyBulk(c,slele);
			addReplyDouble(c,node_score);
		}
	} else {
		addReply(c,shared.emptymultibulk);
	}

cleanup:;
	if (zobj->encoding == OBJ_ENCODING_SKIPLIST)
		decrRefCount(slele);
}

void zlpopifCommand(client *c) {
	zpopGenericCommand(c, 0, 1);
}

void zrpopifCommand(client *c) {
	zpopGenericCommand(c, 1, 1);
}

缺点是:后续官方更新都需要改代码。

时髦势

使用[Redis 4.0模块实现。此处是GitHub传送门。

相比前两种方法,此方法逻辑收归在服务端,且不需要修改Redis源码便于升级。但需要注意资源释放、复制机制等细节,谨防踩坑。

五、修改源码、实现模块后一些问题

1 . 兼容性:要求所有从机、或加载AOF/RDB的实例均实现了新的命令,即均为修改版Redis或均加载了扩展模块。

2 . 命令写入AOF和从机的时机:

  • 对于3.2.X使用LUA法,默认复制脚本本身,但可以使Redis仅复 制导致变更的命令而非整个命令,参考脚本中有关”Replicating commands instead of scripts”和”Selective replication of commands”的内容。
  • 对于3.2.X版本修改源码法,在server.c:call中,仅当有变更设置dirty变量值大于0时,才会触发命令传播,因此如果命令没有成功pop元素将不会产生命令传播。
  • 对于4.0 modules,我们的实现中使用了低级API,则需要实现中根据需要调用RedisModule_ReplicateVerbatim复制命令。

3 . 消息处理失败处理:ZSET中消息被pop后才被client取得处理,若client处理失败则需要client在保证幂等的前提下自行重试。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java达人

oracle commit详解

它执行的时候,你不会有什么感觉。commit在数据库编程的时候很常用,当你执行DML操作时,数据库并不会立刻修改表中数据,这时你需要commit,数据库中的数据...

3419
来自专栏小樱的经验随笔

闲的无聊时候就手动写第一个漏洞扫描工具吧!

上课太无聊了,今天就用python写个漏洞扫描器玩玩,原理是先检测漏洞,在扫描备份,文件结果自动保存在当前目录 主要就是:信息获取、模拟攻击。 网络漏洞扫描对目...

5404
来自专栏惨绿少年

OpenStack云计算之路-Mitaka 版本

1.1 云计算简介 云计算(英语:cloud computing ),是一种基于互联网的计算方式,通过这种方式,共享的软硬件资源和信息可以按需求提供给计算机各种...

4258
来自专栏文渊之博

数据库副本的自动种子设定(自增长)

背景 在 SQL Server 2012 和 2014 中,初始化 SQL Server Always On 可用性组中的次要副本的唯一方法是使用备份、复制和还...

26711
来自专栏肖洒的博客

Web(二):Django概述

昨晚下了一晚的雨,早上研会的趣味运动会忙了一早上,下午是时候学习了。(白眼) 写在前面 之前看 learn python the hard way 推荐使用No...

762
来自专栏后端技术探索

当规模到亿级,MySQL是一个更好的NoSQL!

MySQL是一个更好的NoSQL数据库。当考虑到NoSQL的使用案例,比如对Key/Value键值存储来讲,MySQL在性能、易用性和稳定性方面更有意义。MyS...

821
来自专栏xingoo, 一个梦想做发明家的程序员

循序渐进,了解Hive是什么!

一直想抽个时间整理下最近的所学,断断续续接触hive也有半个多月了,大体上了解了很多Hive相关的知识。那么,一般对陌生事物的认知都会经历下面几个阶段: ...

2085
来自专栏杨建荣的学习笔记

Greenplum集群主机名问题及修复

昨天写了一篇Greenplum数据仓库迁移小记,看起来一起都在计划中,一切都在掌握中,今天早上的时候,统计组的同学反馈说写入GP的时候报了下面的错误。

912
来自专栏祝威廉

如何提高ElasticSearch 索引速度

这篇文章会讲述上面几个参数的原理,以及一些其他的思路。这些参数大体上是朝着两个方向优化的:

1093
来自专栏北京马哥教育

10个实用的Django建议

Django 作为一个杰出的Python开源框架,或许得不到和其它流行框架如Rails这样多的赞美,但是它和其他框架一样精炼,非常注重DRY(Don’t Rep...

3698

扫码关注云+社区