首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

NET Core 实现RedisClient

来源:YSWALLE

cnblogs.com/yswenli/p/8608661.html

引言

工作上有需要使用Redis,于是便心血来潮打算自己写一个C#客户端。经过几天的努力,目前该客户端已经基本成型,下面简单介绍一下。

通信协议

要想自行实现redisClient,则必须先要了解Redis的socket能信协议。新版统一请求协议在 Redis 1.2 版本中引入, 并最终在 Redis 2.0 版本成为 Redis 服务器通信的标准方式。在这个协议中, 所有发送至 Redis 服务器的参数都是二进制安全(binary safe)的。

以下是这个协议的一般形式

* CR LF

$ CR LF

CR LF

...

$ CR LF

CR LF

注:命令本身也作为协议的其中一个参数来发送。举个例子, 以下是一个命令协议的打印版本:

*3

$3

SET

$5

mykey

$7

myvalue

这个命令的实际协议值如下:

"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"

稍后看到, 这种格式除了用作命令请求协议之外, 也用在命令的回复协议中: 这种只有一个参数的回复格式被称为批量回复(Bulk Reply)。

统一协议请求原本是用在回复协议中, 用于将列表的多个项返回给客户端的, 这种回复格式被称为多条批量回复(Multi Bulk Reply)。一个多条批量回复以 *\r\n 为前缀, 后跟多条不同的批量回复, 其中 argc 为这些批量回复的数量。

Redis 命令会返回多种不同类型的回复。一个状态回复(或者单行回复,single line reply)是一段以 "+" 开始、 "\r\n" 结尾的单行字符串。通过检查服务器发回数据的第一个字节, 可以确定这个回复是什么类型:

状态回复(status reply)的第一个字节是 "+"

错误回复(error reply)的第一个字节是 "-"

整数回复(integer reply)的第一个字节是 ":"

批量回复(bulk reply)的第一个字节是 "$"

多条批量回复(multi bulk reply)的第一个字节是 "*"

.NET Core Socket

说起socket,就不得不说IOCP了,这个方案本身就是为了解决多连接、高并发而设计的;但是话又说回来,任何方案都有局限性,不可能解决所有问题;这里不去讨论用在这里是否合适,反正本人就是想这么试一把:用一个简单的ioc模式实现SAEA.Socket,并为此设定各种场景,反过来优化SAEA.Socket本身。下面是一段服务器接收连接的代码:

private void ProcessAccept(SocketAsyncEventArgs args)

{

if (args == null)

{

args = new SocketAsyncEventArgs();

args.Completed += ProcessAccepted;

}

else

{

args.AcceptSocket = null;

}

if (!_listener.AcceptAsync(args))

{

ProcessAccepted(_listener, args);

}

}

项目结构

在网上找到redis的命令文档后,本人觉的准备工作差不多了,可以初步定一下项目结构:

Core:定义的是Redisclient相关最基本的业务

Interface:定义的是一些需要抽象出来的接口

Model:定义的是redis的数据模型及其请求、回复的类型枚举

Net:这里就是将继承实现SAEA.Socket而来的RedisConnection通信基础

命令解码器

通过前面的准备工作了解到redisClient的关键在于命令的编解码,至于高大上算法或redis官方算法的实现,本人没有去详细了解,一冲动就自行实现了自定义版的解码器。

public string Coder(RequestType commandName, params string[] @params)

{

_autoResetEvent.WaitOne();

_commandName = commandName;

var sb = new StringBuilder();

sb.AppendLine("*" + @params.Length);

foreach (var param in @params)

{

sb.AppendLine("$" + param.Length);

sb.AppendLine(param);

}

return sb.ToString();

}

public ResponseData Decoder()

{

var result = new ResponseData();

string command = null;

string error = null;

var len = 0;

switch (_commandName)

{

case RequestType.PING:

command = BlockDequeue();

if (GetStatus(command, out error))

{

result.Type = ResponseType.OK;

result.Data = "PONG";

}

else

{

result.Type = ResponseType.Error;

result.Data = error;

}

break;

case RequestType.AUTH:

case RequestType.SELECT:

case RequestType.SLAVEOF:

case RequestType.SET:

case RequestType.DEL:

case RequestType.HSET:

case RequestType.HDEL:

case RequestType.LSET:

command = BlockDequeue();

if (GetStatus(command, out error))

{

result.Type = ResponseType.OK;

result.Data = "OK";

}

else

{

result.Type = ResponseType.Error;

result.Data = error;

}

break;

case RequestType.TYPE:

command = BlockDequeue();

if (GetStatusString(command, out string msg))

{

result.Type = ResponseType.OK;

}

else

{

result.Type = ResponseType.Error;

}

result.Data = msg;

break;

case RequestType.GET:

case RequestType.GETSET:

case RequestType.HGET:

case RequestType.LPOP:

case RequestType.RPOP:

case RequestType.SRANDMEMBER:

case RequestType.SPOP:

len = GetWordsNum(BlockDequeue(), out error);

if (len == -1)

{

result.Type = ResponseType.Empty;

result.Data = error;

}

else

{

result.Type = ResponseType.String;

result.Data += BlockDequeue();

}

break;

case RequestType.KEYS:

case RequestType.HKEYS:

case RequestType.LRANGE:

case RequestType.SMEMBERS:

result.Type = ResponseType.Lines;

var sb = new StringBuilder();

var rn = GetRowNum(BlockDequeue(), out error);

if (!string.IsNullOrEmpty(error))

{

result.Type = ResponseType.Error;

result.Data = error;

break;

}

//再尝试读取一次,发现有回车行出现

if (rn == -1) rn = GetRowNum(BlockDequeue(), out error);

if (!string.IsNullOrEmpty(error))

{

result.Type = ResponseType.Error;

result.Data = error;

break;

}

if (rn > 0)

{

for (int i = 0; i

{

len = GetWordsNum(BlockDequeue(), out error);

sb.AppendLine(BlockDequeue());

}

}

result.Data = sb.ToString();

break;

case RequestType.HGETALL:

case RequestType.ZRANGE:

case RequestType.ZREVRANGE:

result.Type = ResponseType.KeyValues;

sb = new StringBuilder();

rn = GetRowNum(BlockDequeue(), out error);

if (!string.IsNullOrEmpty(error))

{

result.Type = ResponseType.Error;

result.Data = error;

break;

}

if (rn > 0)

{

for (int i = 0; i

{

len = GetWordsNum(BlockDequeue(), out error);

sb.AppendLine(BlockDequeue());

}

}

result.Data = sb.ToString();

break;

case RequestType.DBSIZE:

case RequestType.EXISTS:

case RequestType.EXPIRE:

case RequestType.PERSIST:

case RequestType.SETNX:

case RequestType.HEXISTS:

case RequestType.HLEN:

case RequestType.LLEN:

case RequestType.LPUSH:

case RequestType.RPUSH:

case RequestType.LREM:

case RequestType.SADD:

case RequestType.SCARD:

case RequestType.SISMEMBER:

case RequestType.SREM:

case RequestType.ZADD:

case RequestType.ZCARD:

case RequestType.ZCOUNT:

case RequestType.ZREM:

case RequestType.PUBLISH:

var val = GetValue(BlockDequeue(), out error);

if (!string.IsNullOrEmpty(error))

{

result.Type = ResponseType.Error;

result.Data = error;

break;

}

if (val == 0)

{

result.Type = ResponseType.Empty;

}

else

{

result.Type = ResponseType.OK;

}

result.Data = val.ToString();

break;

case RequestType.INFO:

var rnum = GetWordsNum(BlockDequeue(), out error);

if (!string.IsNullOrEmpty(error))

{

result.Type = ResponseType.Error;

result.Data = error;

break;

}

var info = "";

while (info.Length

{

info += BlockDequeue();

}

result.Type = ResponseType.String;

result.Data = info;

break;

case RequestType.SUBSCRIBE:

var r = "";

while (IsSubed)

{

r = BlockDequeue();

if (r == "message\r\n")

{

result.Type = ResponseType.Sub;

BlockDequeue();

result.Data = BlockDequeue();

BlockDequeue();

result.Data += BlockDequeue();

break;

}

}

break;

case RequestType.UNSUBSCRIBE:

var rNum = GetRowNum(BlockDequeue(), out error);

var wNum = GetWordsNum(BlockDequeue(), out error);

BlockDequeue();

wNum = GetWordsNum(BlockDequeue(), out error);

var channel = BlockDequeue();

var vNum = GetValue(BlockDequeue(), out error);

IsSubed = false;

break;

}

_autoResetEvent.Set();

return result;

}

命令的封装与测试

有了socket、redisCoder之后,现在就可以按照官方的redis命令来进行.net core的封装了。

本人将这些操作封装到RedisClient、RedisDataBase两个类中,然后又想到连接复用的问题,简单实现了一个连接池RedisClientFactory的类。这样一来就可以好好的来实验一把,看看之前的设想最终能不能实现了:

/****************************************************************************

*Copyright (c) 2018 Microsoft All Rights Reserved.

*CLR版本: 4.0.30319.42000

*机器名称:WENLI-PC

*公司名称:Microsoft

*命名空间:SAEA.RedisSocketTest

*文件名: Program

*版本号: V1.0.0.0

*唯一标识:3d4f939c-3fb9-40e9-a0e0-c7ec773539ae

*当前的用户域:WENLI-PC

*创建人: yswenli

*创建时间:2018/3/17 10:37:15

*描述:

*

*=====================================================================

*修改标记

*修改时间:2018/3/19 10:37:15

*修改人: yswenli

*版本号: V1.0.0.0

*描述:

*

*****************************************************************************/

using SAEA.Commom;

using SAEA.RedisSocket;

using System;

namespace SAEA.RedisSocketTest

{

class Program

{

static void Main(string[] args)

{

ConsoleHelper.Title = "SAEA.RedisSocketTest";

ConsoleHelper.WriteLine("输入ip:port连接RedisServer");

var ipPort = ConsoleHelper.ReadLine();

if (string.IsNullOrEmpty(ipPort))

{

ipPort = "127.0.0.1:6379";

}

RedisClient redisClient = new RedisClient(ipPort);

redisClient.Connect();

//redisClient.Connect("wenli");

var info = redisClient.Info();

if (info.Contains("NOAUTH Authentication required."))

{

while (true)

{

ConsoleHelper.WriteLine("请输入redis连接密码");

var auth = ConsoleHelper.ReadLine();

if (string.IsNullOrEmpty(auth))

{

auth = "yswenli";

}

var a = redisClient.Auth(auth);

if (a.Contains("OK"))

{

break;

}

else

{

ConsoleHelper.WriteLine(a);

}

}

}

//redisConnection.SlaveOf();

//redisConnection.Ping();

redisClient.Select(1);

//ConsoleHelper.WriteLine(redisConnection.Type("key0"));

ConsoleHelper.WriteLine("dbSize:", redisClient.DBSize().ToString());

RedisOperationTest(redisClient, true);

ConsoleHelper.ReadLine();

}

private static void RedisOperationTest(object sender, bool status)

{

RedisClient redisClient = (RedisClient)sender;

if (status)

{

ConsoleHelper.WriteLine("连接redis服务器成功!");

#region key value

ConsoleHelper.WriteLine("回车开始kv插值操作...");

ConsoleHelper.ReadLine();

for (int i = 0; i

{

redisClient.GetDataBase().Set("key" + i, "val" + i);

}

//redisConnection.GetDataBase().Exists("key0");

ConsoleHelper.WriteLine("kv插入完成...");

ConsoleHelper.WriteLine("回车开始获取kv值操作...");

ConsoleHelper.ReadLine();

var keys = redisClient.GetDataBase().Keys().Data.ToArray(false, "\r\n");

foreach (var key in keys)

{

var val = redisClient.GetDataBase().Get(key);

ConsoleHelper.WriteLine("Get val:" + val);

}

ConsoleHelper.WriteLine("获取kv值完成...");

ConsoleHelper.WriteLine("回车开始开始kv移除操作...");

ConsoleHelper.ReadLine();

foreach (var key in keys)

{

redisClient.GetDataBase().Del(key);

}

ConsoleHelper.WriteLine("移除kv值完成...");

#endregion

#region hashset

string hid = "wenli";

ConsoleHelper.WriteLine("回车开始HashSet插值操作...");

ConsoleHelper.ReadLine();

for (int i = 0; i

{

redisClient.GetDataBase().HSet(hid, "key" + i, "val" + i);

}

ConsoleHelper.WriteLine("HashSet插值完成...");

ConsoleHelper.WriteLine("回车开始HashSet插值操作...");

ConsoleHelper.ReadLine();

var hkeys = redisClient.GetDataBase().GetHKeys(hid).Data.ToArray();

foreach (var hkey in hkeys)

{

var val = redisClient.GetDataBase().HGet(hid, hkey);

ConsoleHelper.WriteLine("HGet val:" + val.Data);

}

var hall = redisClient.GetDataBase().HGetAll("wenli");

ConsoleHelper.WriteLine("HashSet查询完成...");

ConsoleHelper.WriteLine("回车开始HashSet移除操作...");

ConsoleHelper.ReadLine();

foreach (var hkey in hkeys)

{

redisClient.GetDataBase().HDel(hid, hkey);

}

ConsoleHelper.WriteLine("HashSet移除完成...");

#endregion

//redisConnection.GetDataBase().Suscribe((c, m) =>

//{

// ConsoleHelper.WriteLine("channel: msg:", c, m);

// redisConnection.GetDataBase().UNSUBSCRIBE(c);

//}, "c39654");

ConsoleHelper.WriteLine("测试完成!");

}

else

{

ConsoleHelper.WriteLine("连接失败!");

}

}

}

}

经过上面的代码测试,使用redis-cli工具进行monitor命令监控发现——搞定了!另外源码本人已发到github上面了,SAEA.RedisSocket的详细可查看:https://github.com/yswenli/SAEA/tree/master/Src/SAEA.RedisSocket

看完本文有收获?请转发分享给更多人

关注「DotNet」,提升.Net技能

淘口令:复制以下红色内容,再打开手淘即可购买

范品社,使用¥极客T恤¥抢先预览(长按复制整段文案,打开手机淘宝即可进入活动内容)

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180414B0IZB100?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券