多姿势扩展 Redis 命令

一、业务场景

空间宠物业务需要实现一个定时消息触发组件,如在特定时刻给用户推送收集糖果通知、biubiu球功能定时回收用户丢弃的球等。可见,消息只有在特定时间到达才能被处理。同时,消息的产生是无序的,即后产生的消息被处理的时间可能早于先产生的消息。

二、为何选择Redis

一些著名的消息队列组件,如ActiveMQ ,本身支持消息延迟投递,为何本文选择Redis呢?一方面是引入新组建有学习、运维、接入成本,而组内已积累一定Redis开发运维经验;另一方面则是基于Redis实现这样一个组件难度也不大。所以决定采用Redis。

三、原生能力探究

键空间通知

键空间通知可以在消息到达时插入一个key,并给key设置过期时间,键过期后会通过特定频道发布键过期通知,订阅方可收到通知并处理事件。但问题在于:

  • key过期并不保证立即删除,Redis只会每次执行server.c:databasesCron时随机删除若干key,大量key同时过期无法保证时效;
  • Pub/Sub机制不保证通知送达,若client掉线则通知丢失;
  • 若多个client同时订阅,则都会收到通知,导致重复处理。

基于原生ZSET

ZSET可在消息插入时根据score排序,从而使最早的消息排在最前面。但ZSET没有提供POP方法,取得第一个元素和删除需要执行两个命令。为保证原子性,可以采用事务,如:

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> ZRANGE myzset 0 0 WITHSCORES
QUEUED
127.0.0.1:6379> ZREMRANGEBYRANK myzset 0 0
QUEUED
127.0.0.1:6379> EXEC
1) 1) "b"
   2) "2"
2) (integer) 1

或者使用pipelining,如:

$ (printf "ZRANGE myzset 0 0 WITHSCORES\r\nZREMRANGEBYRANK myzset 0 0\r\n"; sleep 1) | nc localhost 6379
*2
$1
c
$1
3
:1

但问题在于,虽然可顺序取出消息,但无法只在时间到达后取出消息。因此需要client端实现逻辑等待时间到达再推送。同时,消息产生是无序的,如果取得了一个10分钟后处理的消息,在此期间又产生了一个需要在5分钟后处理的消息,逻辑将变得复杂。

由于使用原生Redis无法满足需求,我们决定扩展Redis命令。

四、多姿势命令扩展

官方势

LUA脚本是利用3.X版官方特性实现命令扩展的途径。以下脚本将读出首元素,并与当前时间戳(以参数传入)比较,如果消息处理时间到达则删除消息并返回;所有操作将是原子的。目前我们线上服务使用该方案。

LUA脚本:

local rs = redis.call('ZRANGE', KEYS[1], '0', '0', 'WITHSCORES');
if table.getn(rs)<2 then return rs end;
if tonumber(rs[2]) < tonumber(ARGV[1]) then
    redis.call('ZREMRANGEBYRANK', KEYS[1], 0, 0);
    return rs
end;
return {}

client生成命令:

redisFormatCommand(&pCmd, "eval %s 1 %s %lld", szScript, szKey, (int64_t)time(NULL)));

缺点是:

  • 使用lua脚本有额外学习成本
  • 实现在客户端,无法很好的复用
  • 使用lua后要做好运维工作,配置脚本超时,注意脚本缓存内存占用

暴力势

改源码,加一个命令。我们较早上线的一个服务使用了该方案。

/* 需要在server.c中加入实现的命令:
struct redisCommand redisCommandTable[] = {
    //......
    {"zlpopif",zlpopifCommand,3,"w",0,NULL,1,1,1,0,0},
    {"zrpopif",zrpopifCommand,3,"w",0,NULL,1,1,1,0,0},
};
*/

/* 实现在t_zset.c: */
void zpopGenericCommand(client *c, int reverse, int condition) {
	robj *key = c->argv[1];
	robj *zobj;
	int keyremoved = 0;
	unsigned long deleted = 0;
	long start = 0, end = 0, llen = 0;

	/* for deletion */
	unsigned char *zleptr, *zlsptr;

	/* for addReply */
	unsigned char zlvstr[128];
	unsigned int zlvlen = 0;
	long long zlvlong = 0;
	robj *slele;
	double node_score;

	/* Step 1: Lookup & range sanity checks if needed. */
	if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
		checkType(c,zobj,OBJ_ZSET)) return;

	llen = zsetLength(zobj);
	if (end >= llen) end = llen-1;

	if (start > end || start >= llen) {
		return;
	}

	/* Step 2: Get value of the node will be remove */
	if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		unsigned char *zl = zobj->ptr;
		unsigned char *vstr;

		if (reverse)
			zleptr = ziplistIndex(zl,-2-(2*start));
		else
			zleptr = ziplistIndex(zl,2*start);

		serverAssertWithInfo(c,zobj,zleptr != NULL);
		zlsptr = ziplistNext(zl,zleptr);

		serverAssertWithInfo(c,zobj,zleptr != NULL && zlsptr != NULL);
		serverAssertWithInfo(c,zobj,ziplistGet(zleptr,&vstr,&zlvlen,&zlvlong));

		/* copy the result, sice the node will be delete before addReply */
		node_score = zzlGetScore(zlsptr);
		if (vstr)
			strncpy((char *)zlvstr, (char *)vstr, zlvlen);
	} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
		zset *zs = zobj->ptr;
		zskiplist *zsl = zs->zsl;
		zskiplistNode *ln;

		/* Check if starting point is trivial, before doing log(N) lookup. */
		if (reverse) {
			ln = zsl->tail;
		} else {
			ln = zsl->header->level[0].forward;
		}

		serverAssertWithInfo(c,zobj,ln != NULL);
		slele = ln->obj;
		incrRefCount(slele); /* MUST call decrRefCount to free mem */
		node_score = ln->score;
	} else {
		serverPanic("Unknown sorted set encoding");
	}

	/* Step 3: Check if condition satisfied. */
	if (condition) {
		double condscore = 0;
		if (getDoubleFromObjectOrReply(c,c->argv[2],&condscore,NULL)
			!= C_OK) goto cleanup;

		if (!reverse && condscore < node_score) {
			addReply(c,shared.emptymultibulk);
			goto cleanup;
		}
		if (reverse && condscore > node_score) {
			addReply(c,shared.emptymultibulk);
			goto cleanup;
		}
	}

	/* Step 4: Perform the deletion operation. */
	if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		/* delete by ptr */
		serverAssertWithInfo(c,zobj,zleptr != NULL);
        zobj->ptr = zzlDelete(zobj->ptr,zleptr);
        deleted = 1;
        /* delete by range */
		/*zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);*/

		if (zzlLength(zobj->ptr) == 0) {
			dbDelete(c->db,key);
			keyremoved = 1;
		}
	} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
		zset *zs = zobj->ptr;

		/* delete by ptr */
		/*serverAssertWithInfo(c,zobj,slele != NULL);
		serverAssertWithInfo(c,zobj,zslDelete(zs->zsl,node_score,slele));
		dictDelete(zs->dict,slele);
		deleted = 1;*/
		/* delete by range */
		deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);

		if (htNeedsResize(zs->dict)) dictResize(zs->dict);
		if (dictSize(zs->dict) == 0) {
			dbDelete(c->db,key);
			keyremoved = 1;
		}
	} else {
		serverPanic("Unknown sorted set encoding");
	}

	/* Step 5: Notifications and reply. */
	if (deleted) {
		signalModifiedKey(c->db,key);
		notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); /* we reuse the built in "zrem" keyspace event for pop operation! */
		if (keyremoved)
			notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
		server.dirty += deleted;
	}

	/* Step 6: Return the result in form of a multi-bulk reply */
	if (deleted) {
		addReplyMultiBulkLen(c, 2); /* at most one element with score */
		if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
			if (zlvlen == 0)
				addReplyBulkLongLong(c,zlvlong);
			else
				addReplyBulkCBuffer(c,zlvstr,zlvlen);
			addReplyDouble(c,node_score);
		} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
			addReplyBulk(c,slele);
			addReplyDouble(c,node_score);
		}
	} else {
		addReply(c,shared.emptymultibulk);
	}

cleanup:;
	if (zobj->encoding == OBJ_ENCODING_SKIPLIST)
		decrRefCount(slele);
}

void zlpopifCommand(client *c) {
	zpopGenericCommand(c, 0, 1);
}

void zrpopifCommand(client *c) {
	zpopGenericCommand(c, 1, 1);
}

缺点是:后续官方更新都需要改代码。

时髦势

使用[Redis 4.0模块实现。此处是GitHub传送门。

相比前两种方法,此方法逻辑收归在服务端,且不需要修改Redis源码便于升级。但需要注意资源释放、复制机制等细节,谨防踩坑。

五、修改源码、实现模块后一些问题

1 . 兼容性:要求所有从机、或加载AOF/RDB的实例均实现了新的命令,即均为修改版Redis或均加载了扩展模块。

2 . 命令写入AOF和从机的时机:

  • 对于3.2.X使用LUA法,默认复制脚本本身,但可以使Redis仅复 制导致变更的命令而非整个命令,参考脚本中有关”Replicating commands instead of scripts”和”Selective replication of commands”的内容。
  • 对于3.2.X版本修改源码法,在server.c:call中,仅当有变更设置dirty变量值大于0时,才会触发命令传播,因此如果命令没有成功pop元素将不会产生命令传播。
  • 对于4.0 modules,我们的实现中使用了低级API,则需要实现中根据需要调用RedisModule_ReplicateVerbatim复制命令。

3 . 消息处理失败处理:ZSET中消息被pop后才被client取得处理,若client处理失败则需要client在保证幂等的前提下自行重试。

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏恰同学骚年

.NET Core微服务之基于EasyNetQ使用RabbitMQ消息队列

  “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中,“消息队列”是在消息的传...

5055
来自专栏菩提树下的杨过

老生常谈:利用Membership实现SSO(单点登录)

虽然有一些现成的第三方解决方案比如:OpenID,Passport,SpaceCard等都还不错,但是要么就是收费的(passport),要么就是有点用不习惯(...

1905
来自专栏黑泽君的专栏

day51_BOS项目_03

将上面的js文件引入所需要的jsp页面中,本例以index.jsp为例 /bos19/WebContent/WEB-INF/pages/common/inde...

651

Centry 7上的Garry's Mod

Garry's Mod可以完全控制和修改视频游戏引擎——起源引擎。你几乎可以使用Garry's Mod制作任何你想要的游戏。架设Garry's Mod服务器是在...

933
来自专栏云计算

使用HyperForm自动配置虚拟机(第2部分)

原文地址:https://dzone.com/articles/automated-self-service-provisioning-of-vms-using...

2666
来自专栏小狼的世界

PHP中如何处理时区

默认安装的LAMP环境中,时区默认设置在GMT时间,所以一般得到的时间都会比我们电脑中显示的时间早八个小时(假设你的时间设置正确,并且时区是在东八区)。PHP提...

1572
来自专栏逸鹏说道

NET跨平台:在Ubuntu下搭建ASP.NET 5开发环境

0x00 写在前面的废话 年底这段时间实在太忙了,各种事情都凑在这个时候,没时间去学习自己感兴趣的东西,所以博客也好就没写了。最近工作上有个小功能要做成Web应...

2713
来自专栏散尽浮华

Saltstack自动化操作记录(1)-环境部署

早期运维工作中用过稍微复杂的Puppet,下面介绍下更为简单实用的Saltstack自动化运维的使用。 Saltstack知多少 Saltstack是一种全新的...

23610
来自专栏云计算教程系列

如何在Ubuntu 14.04上设置时区和NTP同步[快速入门]

正确设置服务器的时钟和时区对于确保分布式系统的正常运行和维护准确的日志时间戳至关重要。本教程将向您展示如何配置NTP时间同步并在Ubuntu 14.04服务器上...

3250
来自专栏吴柯的运维笔记

Linux下监控软件Zabbix安装部署教学

“每个理性的IT人士都置顶了吴柯的运维笔记” 1.部署LNMP环境 安装php Zabbix 3.0对PHP的要求最低为5.4,而CentOS6默认为5.3.3...

40013

扫码关注云+社区