多姿势扩展 Redis 命令

一、业务场景

空间宠物业务需要实现一个定时消息触发组件,如在特定时刻给用户推送收集糖果通知、biubiu球功能定时回收用户丢弃的球等。可见,消息只有在特定时间到达才能被处理。同时,消息的产生是无序的,即后产生的消息被处理的时间可能早于先产生的消息。

二、为何选择Redis

一些著名的消息队列组件,如ActiveMQ ,本身支持消息延迟投递,为何本文选择Redis呢?一方面是引入新组建有学习、运维、接入成本,而组内已积累一定Redis开发运维经验;另一方面则是基于Redis实现这样一个组件难度也不大。所以决定采用Redis。

三、原生能力探究

键空间通知

键空间通知可以在消息到达时插入一个key,并给key设置过期时间,键过期后会通过特定频道发布键过期通知,订阅方可收到通知并处理事件。但问题在于:

  • key过期并不保证立即删除,Redis只会每次执行server.c:databasesCron时随机删除若干key,大量key同时过期无法保证时效;
  • Pub/Sub机制不保证通知送达,若client掉线则通知丢失;
  • 若多个client同时订阅,则都会收到通知,导致重复处理。

基于原生ZSET

ZSET可在消息插入时根据score排序,从而使最早的消息排在最前面。但ZSET没有提供POP方法,取得第一个元素和删除需要执行两个命令。为保证原子性,可以采用事务,如:

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> ZRANGE myzset 0 0 WITHSCORES
QUEUED
127.0.0.1:6379> ZREMRANGEBYRANK myzset 0 0
QUEUED
127.0.0.1:6379> EXEC
1) 1) "b"
   2) "2"
2) (integer) 1

或者使用pipelining,如:

$ (printf "ZRANGE myzset 0 0 WITHSCORES\r\nZREMRANGEBYRANK myzset 0 0\r\n"; sleep 1) | nc localhost 6379
*2
$1
c
$1
3
:1

但问题在于,虽然可顺序取出消息,但无法只在时间到达后取出消息。因此需要client端实现逻辑等待时间到达再推送。同时,消息产生是无序的,如果取得了一个10分钟后处理的消息,在此期间又产生了一个需要在5分钟后处理的消息,逻辑将变得复杂。

由于使用原生Redis无法满足需求,我们决定扩展Redis命令。

四、多姿势命令扩展

官方势

LUA脚本是利用3.X版官方特性实现命令扩展的途径。以下脚本将读出首元素,并与当前时间戳(以参数传入)比较,如果消息处理时间到达则删除消息并返回;所有操作将是原子的。目前我们线上服务使用该方案。

LUA脚本:

local rs = redis.call('ZRANGE', KEYS[1], '0', '0', 'WITHSCORES');
if table.getn(rs)<2 then return rs end;
if tonumber(rs[2]) < tonumber(ARGV[1]) then
    redis.call('ZREMRANGEBYRANK', KEYS[1], 0, 0);
    return rs
end;
return {}

client生成命令:

redisFormatCommand(&pCmd, "eval %s 1 %s %lld", szScript, szKey, (int64_t)time(NULL)));

缺点是:

  • 使用lua脚本有额外学习成本
  • 实现在客户端,无法很好的复用
  • 使用lua后要做好运维工作,配置脚本超时,注意脚本缓存内存占用

暴力势

改源码,加一个命令。我们较早上线的一个服务使用了该方案。

/* 需要在server.c中加入实现的命令:
struct redisCommand redisCommandTable[] = {
    //......
    {"zlpopif",zlpopifCommand,3,"w",0,NULL,1,1,1,0,0},
    {"zrpopif",zrpopifCommand,3,"w",0,NULL,1,1,1,0,0},
};
*/

/* 实现在t_zset.c: */
void zpopGenericCommand(client *c, int reverse, int condition) {
	robj *key = c->argv[1];
	robj *zobj;
	int keyremoved = 0;
	unsigned long deleted = 0;
	long start = 0, end = 0, llen = 0;

	/* for deletion */
	unsigned char *zleptr, *zlsptr;

	/* for addReply */
	unsigned char zlvstr[128];
	unsigned int zlvlen = 0;
	long long zlvlong = 0;
	robj *slele;
	double node_score;

	/* Step 1: Lookup & range sanity checks if needed. */
	if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
		checkType(c,zobj,OBJ_ZSET)) return;

	llen = zsetLength(zobj);
	if (end >= llen) end = llen-1;

	if (start > end || start >= llen) {
		return;
	}

	/* Step 2: Get value of the node will be remove */
	if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		unsigned char *zl = zobj->ptr;
		unsigned char *vstr;

		if (reverse)
			zleptr = ziplistIndex(zl,-2-(2*start));
		else
			zleptr = ziplistIndex(zl,2*start);

		serverAssertWithInfo(c,zobj,zleptr != NULL);
		zlsptr = ziplistNext(zl,zleptr);

		serverAssertWithInfo(c,zobj,zleptr != NULL && zlsptr != NULL);
		serverAssertWithInfo(c,zobj,ziplistGet(zleptr,&vstr,&zlvlen,&zlvlong));

		/* copy the result, sice the node will be delete before addReply */
		node_score = zzlGetScore(zlsptr);
		if (vstr)
			strncpy((char *)zlvstr, (char *)vstr, zlvlen);
	} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
		zset *zs = zobj->ptr;
		zskiplist *zsl = zs->zsl;
		zskiplistNode *ln;

		/* Check if starting point is trivial, before doing log(N) lookup. */
		if (reverse) {
			ln = zsl->tail;
		} else {
			ln = zsl->header->level[0].forward;
		}

		serverAssertWithInfo(c,zobj,ln != NULL);
		slele = ln->obj;
		incrRefCount(slele); /* MUST call decrRefCount to free mem */
		node_score = ln->score;
	} else {
		serverPanic("Unknown sorted set encoding");
	}

	/* Step 3: Check if condition satisfied. */
	if (condition) {
		double condscore = 0;
		if (getDoubleFromObjectOrReply(c,c->argv[2],&condscore,NULL)
			!= C_OK) goto cleanup;

		if (!reverse && condscore < node_score) {
			addReply(c,shared.emptymultibulk);
			goto cleanup;
		}
		if (reverse && condscore > node_score) {
			addReply(c,shared.emptymultibulk);
			goto cleanup;
		}
	}

	/* Step 4: Perform the deletion operation. */
	if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		/* delete by ptr */
		serverAssertWithInfo(c,zobj,zleptr != NULL);
        zobj->ptr = zzlDelete(zobj->ptr,zleptr);
        deleted = 1;
        /* delete by range */
		/*zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);*/

		if (zzlLength(zobj->ptr) == 0) {
			dbDelete(c->db,key);
			keyremoved = 1;
		}
	} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
		zset *zs = zobj->ptr;

		/* delete by ptr */
		/*serverAssertWithInfo(c,zobj,slele != NULL);
		serverAssertWithInfo(c,zobj,zslDelete(zs->zsl,node_score,slele));
		dictDelete(zs->dict,slele);
		deleted = 1;*/
		/* delete by range */
		deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);

		if (htNeedsResize(zs->dict)) dictResize(zs->dict);
		if (dictSize(zs->dict) == 0) {
			dbDelete(c->db,key);
			keyremoved = 1;
		}
	} else {
		serverPanic("Unknown sorted set encoding");
	}

	/* Step 5: Notifications and reply. */
	if (deleted) {
		signalModifiedKey(c->db,key);
		notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); /* we reuse the built in "zrem" keyspace event for pop operation! */
		if (keyremoved)
			notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
		server.dirty += deleted;
	}

	/* Step 6: Return the result in form of a multi-bulk reply */
	if (deleted) {
		addReplyMultiBulkLen(c, 2); /* at most one element with score */
		if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
			if (zlvlen == 0)
				addReplyBulkLongLong(c,zlvlong);
			else
				addReplyBulkCBuffer(c,zlvstr,zlvlen);
			addReplyDouble(c,node_score);
		} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
			addReplyBulk(c,slele);
			addReplyDouble(c,node_score);
		}
	} else {
		addReply(c,shared.emptymultibulk);
	}

cleanup:;
	if (zobj->encoding == OBJ_ENCODING_SKIPLIST)
		decrRefCount(slele);
}

void zlpopifCommand(client *c) {
	zpopGenericCommand(c, 0, 1);
}

void zrpopifCommand(client *c) {
	zpopGenericCommand(c, 1, 1);
}

缺点是:后续官方更新都需要改代码。

时髦势

使用[Redis 4.0模块实现。此处是GitHub传送门。

相比前两种方法,此方法逻辑收归在服务端,且不需要修改Redis源码便于升级。但需要注意资源释放、复制机制等细节,谨防踩坑。

五、修改源码、实现模块后一些问题

1 . 兼容性:要求所有从机、或加载AOF/RDB的实例均实现了新的命令,即均为修改版Redis或均加载了扩展模块。

2 . 命令写入AOF和从机的时机:

  • 对于3.2.X使用LUA法,默认复制脚本本身,但可以使Redis仅复 制导致变更的命令而非整个命令,参考脚本中有关”Replicating commands instead of scripts”和”Selective replication of commands”的内容。
  • 对于3.2.X版本修改源码法,在server.c:call中,仅当有变更设置dirty变量值大于0时,才会触发命令传播,因此如果命令没有成功pop元素将不会产生命令传播。
  • 对于4.0 modules,我们的实现中使用了低级API,则需要实现中根据需要调用RedisModule_ReplicateVerbatim复制命令。

3 . 消息处理失败处理:ZSET中消息被pop后才被client取得处理,若client处理失败则需要client在保证幂等的前提下自行重试。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Python攻城狮

Django实战(一)- 搭建简单的博客系统

37620
来自专栏大内老A

[原创]谈谈基于Kerberos的Windows Network Authentication - Part II

四、引入Ticket Granting  Service 通过上面的介绍,我们发现Kerberos实际上一个基于Ticket的认证方式。Client想要获取Se...

22490
来自专栏小狼的世界

PHP中如何处理时区

默认安装的LAMP环境中,时区默认设置在GMT时间,所以一般得到的时间都会比我们电脑中显示的时间早八个小时(假设你的时间设置正确,并且时区是在东八区)。PHP提...

29620
来自专栏肖洒的博客

模拟登陆我终于还是进了

爬了这么久的教务处,终于还是绕过了千山万水登进来了。 这段时间准备期末考,忙,好久不更博。 年终总结也没有写。 是时候开更啦!

12420
来自专栏葡萄城控件技术团队

Asp.Net MVC4入门指南(7):给电影表和模型添加新字段

在本节中,您将使用Entity Framework Code First来实现模型类上的操作。从而使得这些操作和变更,可以应用到数据库中。 默认情况下,就像您在...

205100
来自专栏Python研发

django-rest-framework登陆认证

53220

Centry 7上的Garry's Mod

Garry's Mod可以完全控制和修改视频游戏引擎——起源引擎。你几乎可以使用Garry's Mod制作任何你想要的游戏。架设Garry's Mod服务器是在...

16430
来自专栏吴柯的运维笔记

Linux下监控软件Zabbix安装部署教学

“每个理性的IT人士都置顶了吴柯的运维笔记” 1.部署LNMP环境 安装php Zabbix 3.0对PHP的要求最低为5.4,而CentOS6默认为5.3.3...

467130
来自专栏拂晓风起

cocos2d-js 调试办法 断点调试 Android真机调试

21920
来自专栏晓晨的专栏

Autofac高级用法之动态代理

21230

扫码关注云+社区

领取腾讯云代金券