多姿势扩展 Redis 命令

一、业务场景

空间宠物业务需要实现一个定时消息触发组件,如在特定时刻给用户推送收集糖果通知、biubiu球功能定时回收用户丢弃的球等。可见,消息只有在特定时间到达才能被处理。同时,消息的产生是无序的,即后产生的消息被处理的时间可能早于先产生的消息。

二、为何选择Redis

一些著名的消息队列组件,如ActiveMQ ,本身支持消息延迟投递,为何本文选择Redis呢?一方面是引入新组建有学习、运维、接入成本,而组内已积累一定Redis开发运维经验;另一方面则是基于Redis实现这样一个组件难度也不大。所以决定采用Redis。

三、原生能力探究

键空间通知

键空间通知可以在消息到达时插入一个key,并给key设置过期时间,键过期后会通过特定频道发布键过期通知,订阅方可收到通知并处理事件。但问题在于:

  • key过期并不保证立即删除,Redis只会每次执行server.c:databasesCron时随机删除若干key,大量key同时过期无法保证时效;
  • Pub/Sub机制不保证通知送达,若client掉线则通知丢失;
  • 若多个client同时订阅,则都会收到通知,导致重复处理。

基于原生ZSET

ZSET可在消息插入时根据score排序,从而使最早的消息排在最前面。但ZSET没有提供POP方法,取得第一个元素和删除需要执行两个命令。为保证原子性,可以采用事务,如:

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> ZRANGE myzset 0 0 WITHSCORES
QUEUED
127.0.0.1:6379> ZREMRANGEBYRANK myzset 0 0
QUEUED
127.0.0.1:6379> EXEC
1) 1) "b"
   2) "2"
2) (integer) 1

或者使用pipelining,如:

$ (printf "ZRANGE myzset 0 0 WITHSCORES\r\nZREMRANGEBYRANK myzset 0 0\r\n"; sleep 1) | nc localhost 6379
*2
$1
c
$1
3
:1

但问题在于,虽然可顺序取出消息,但无法只在时间到达后取出消息。因此需要client端实现逻辑等待时间到达再推送。同时,消息产生是无序的,如果取得了一个10分钟后处理的消息,在此期间又产生了一个需要在5分钟后处理的消息,逻辑将变得复杂。

由于使用原生Redis无法满足需求,我们决定扩展Redis命令。

四、多姿势命令扩展

官方势

LUA脚本是利用3.X版官方特性实现命令扩展的途径。以下脚本将读出首元素,并与当前时间戳(以参数传入)比较,如果消息处理时间到达则删除消息并返回;所有操作将是原子的。目前我们线上服务使用该方案。

LUA脚本:

local rs = redis.call('ZRANGE', KEYS[1], '0', '0', 'WITHSCORES');
if table.getn(rs)<2 then return rs end;
if tonumber(rs[2]) < tonumber(ARGV[1]) then
    redis.call('ZREMRANGEBYRANK', KEYS[1], 0, 0);
    return rs
end;
return {}

client生成命令:

redisFormatCommand(&pCmd, "eval %s 1 %s %lld", szScript, szKey, (int64_t)time(NULL)));

缺点是:

  • 使用lua脚本有额外学习成本
  • 实现在客户端,无法很好的复用
  • 使用lua后要做好运维工作,配置脚本超时,注意脚本缓存内存占用

暴力势

改源码,加一个命令。我们较早上线的一个服务使用了该方案。

/* 需要在server.c中加入实现的命令:
struct redisCommand redisCommandTable[] = {
    //......
    {"zlpopif",zlpopifCommand,3,"w",0,NULL,1,1,1,0,0},
    {"zrpopif",zrpopifCommand,3,"w",0,NULL,1,1,1,0,0},
};
*/

/* 实现在t_zset.c: */
void zpopGenericCommand(client *c, int reverse, int condition) {
	robj *key = c->argv[1];
	robj *zobj;
	int keyremoved = 0;
	unsigned long deleted = 0;
	long start = 0, end = 0, llen = 0;

	/* for deletion */
	unsigned char *zleptr, *zlsptr;

	/* for addReply */
	unsigned char zlvstr[128];
	unsigned int zlvlen = 0;
	long long zlvlong = 0;
	robj *slele;
	double node_score;

	/* Step 1: Lookup & range sanity checks if needed. */
	if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
		checkType(c,zobj,OBJ_ZSET)) return;

	llen = zsetLength(zobj);
	if (end >= llen) end = llen-1;

	if (start > end || start >= llen) {
		return;
	}

	/* Step 2: Get value of the node will be remove */
	if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		unsigned char *zl = zobj->ptr;
		unsigned char *vstr;

		if (reverse)
			zleptr = ziplistIndex(zl,-2-(2*start));
		else
			zleptr = ziplistIndex(zl,2*start);

		serverAssertWithInfo(c,zobj,zleptr != NULL);
		zlsptr = ziplistNext(zl,zleptr);

		serverAssertWithInfo(c,zobj,zleptr != NULL && zlsptr != NULL);
		serverAssertWithInfo(c,zobj,ziplistGet(zleptr,&vstr,&zlvlen,&zlvlong));

		/* copy the result, sice the node will be delete before addReply */
		node_score = zzlGetScore(zlsptr);
		if (vstr)
			strncpy((char *)zlvstr, (char *)vstr, zlvlen);
	} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
		zset *zs = zobj->ptr;
		zskiplist *zsl = zs->zsl;
		zskiplistNode *ln;

		/* Check if starting point is trivial, before doing log(N) lookup. */
		if (reverse) {
			ln = zsl->tail;
		} else {
			ln = zsl->header->level[0].forward;
		}

		serverAssertWithInfo(c,zobj,ln != NULL);
		slele = ln->obj;
		incrRefCount(slele); /* MUST call decrRefCount to free mem */
		node_score = ln->score;
	} else {
		serverPanic("Unknown sorted set encoding");
	}

	/* Step 3: Check if condition satisfied. */
	if (condition) {
		double condscore = 0;
		if (getDoubleFromObjectOrReply(c,c->argv[2],&condscore,NULL)
			!= C_OK) goto cleanup;

		if (!reverse && condscore < node_score) {
			addReply(c,shared.emptymultibulk);
			goto cleanup;
		}
		if (reverse && condscore > node_score) {
			addReply(c,shared.emptymultibulk);
			goto cleanup;
		}
	}

	/* Step 4: Perform the deletion operation. */
	if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
		/* delete by ptr */
		serverAssertWithInfo(c,zobj,zleptr != NULL);
        zobj->ptr = zzlDelete(zobj->ptr,zleptr);
        deleted = 1;
        /* delete by range */
		/*zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);*/

		if (zzlLength(zobj->ptr) == 0) {
			dbDelete(c->db,key);
			keyremoved = 1;
		}
	} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
		zset *zs = zobj->ptr;

		/* delete by ptr */
		/*serverAssertWithInfo(c,zobj,slele != NULL);
		serverAssertWithInfo(c,zobj,zslDelete(zs->zsl,node_score,slele));
		dictDelete(zs->dict,slele);
		deleted = 1;*/
		/* delete by range */
		deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);

		if (htNeedsResize(zs->dict)) dictResize(zs->dict);
		if (dictSize(zs->dict) == 0) {
			dbDelete(c->db,key);
			keyremoved = 1;
		}
	} else {
		serverPanic("Unknown sorted set encoding");
	}

	/* Step 5: Notifications and reply. */
	if (deleted) {
		signalModifiedKey(c->db,key);
		notifyKeyspaceEvent(NOTIFY_ZSET,"zrem",key,c->db->id); /* we reuse the built in "zrem" keyspace event for pop operation! */
		if (keyremoved)
			notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
		server.dirty += deleted;
	}

	/* Step 6: Return the result in form of a multi-bulk reply */
	if (deleted) {
		addReplyMultiBulkLen(c, 2); /* at most one element with score */
		if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
			if (zlvlen == 0)
				addReplyBulkLongLong(c,zlvlong);
			else
				addReplyBulkCBuffer(c,zlvstr,zlvlen);
			addReplyDouble(c,node_score);
		} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
			addReplyBulk(c,slele);
			addReplyDouble(c,node_score);
		}
	} else {
		addReply(c,shared.emptymultibulk);
	}

cleanup:;
	if (zobj->encoding == OBJ_ENCODING_SKIPLIST)
		decrRefCount(slele);
}

void zlpopifCommand(client *c) {
	zpopGenericCommand(c, 0, 1);
}

void zrpopifCommand(client *c) {
	zpopGenericCommand(c, 1, 1);
}

缺点是:后续官方更新都需要改代码。

时髦势

使用[Redis 4.0模块实现。此处是GitHub传送门。

相比前两种方法,此方法逻辑收归在服务端,且不需要修改Redis源码便于升级。但需要注意资源释放、复制机制等细节,谨防踩坑。

五、修改源码、实现模块后一些问题

1 . 兼容性:要求所有从机、或加载AOF/RDB的实例均实现了新的命令,即均为修改版Redis或均加载了扩展模块。

2 . 命令写入AOF和从机的时机:

  • 对于3.2.X使用LUA法,默认复制脚本本身,但可以使Redis仅复 制导致变更的命令而非整个命令,参考脚本中有关”Replicating commands instead of scripts”和”Selective replication of commands”的内容。
  • 对于3.2.X版本修改源码法,在server.c:call中,仅当有变更设置dirty变量值大于0时,才会触发命令传播,因此如果命令没有成功pop元素将不会产生命令传播。
  • 对于4.0 modules,我们的实现中使用了低级API,则需要实现中根据需要调用RedisModule_ReplicateVerbatim复制命令。

3 . 消息处理失败处理:ZSET中消息被pop后才被client取得处理,若client处理失败则需要client在保证幂等的前提下自行重试。

原创声明,本文系作者授权云+社区-专栏发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏程序员的SOD蜜

使用“消息服务框架”(MSF)实现分布式事务的三阶段提交协议(电商创建订单的示例)

1,示例解决方案介绍 在上一篇 《消息服务框架(MSF)应用实例之分布式事务三阶段提交协议的实现》中,我们分析了分布式事务的三阶段提交协议的原理,现在我们来看看...

2749
来自专栏PingCAP的专栏

TiDB 1.1 Beta Release

2018 年 2 月 24 日,TiDB 发布 1.1 Beta 版。该版本在 1.1 Alpha 版的基础上,对 MySQL 兼容性、系统稳定性做了很多改进。

2686
来自专栏微服务生态

论代码级性能优化变迁之路(一)

大家好,很久没有和大家一起讨论技术了,那么今天我将和大家一起探讨我负责的某项目的性能变迁之路。

602
来自专栏智能大石头

性能&分布式&NewLife.XCode对无限数据的支持

上周发布了《改进版CodeTimer及XCode性能测试》,展示了NewLife.XCode在性能上的表现。实际上NewLife.XCode是一个很平凡的ORM...

2208
来自专栏IT大咖说

TiDB 原理与实战|架构师实践日

摘要 本篇文章出自七牛云和 PingCAP 联合主办的架构师实践日上,来自 PingCAP 的开发工程师李霞分享的《 TiDB 原理与实战》的演讲,介绍了目前分...

3717
来自专栏匠心独运的博客

消息中间件—RocketMQ的RPC通信(一)

文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键

751
来自专栏张善友的专栏

使用Hystrix提高系统可用性

今天稍微复杂点的互联网应用,服务端基本都是分布式的,大量的服务支撑起整个系统,服务之间也难免有大量的依赖关系,依赖都是通过网络连接起来。 ? (图片来源:htt...

1935
来自专栏Java编程技术

Java网络编程基础篇

网络通讯在系统交互中是必不可少的一部分,无论是面试还是工作中都是绕不过去的一部分,本节我们来谈谈Java网络编程中的一些知识,本chat内容如下:

731
来自专栏解Bug之路

解Bug之路-记一次对端机器宕机后的tcp行为

机器一般过质保之后,就会因为各种各样的问题而宕机。而这一次的宕机,让笔者观察到了平常观察不到的tcp在对端宕机情况下的行为。经过详细跟踪分析原因之后,发现可以通...

793
来自专栏友弟技术工作室

mysql优化

上篇文章是关于mysql优化的,那个内容是我大学的时候学习的笔记,最近学习发现一些比较好的内容,在这里分享给大家。 版权源于网上。 工作中使用最多的就是MySQ...

3637

扫码关注云+社区