前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >学习T-io框架,从写一个Redis客户端开始

学习T-io框架,从写一个Redis客户端开始

作者头像
talent-tan
修改2019-08-07 15:33:02
6450
修改2019-08-07 15:33:02
举报
文章被收录于专栏:t-io小笔记t-io小笔记

原文地址:https://www.t-io.org/137

前言

  了解T-io框架有些日子了,并且还将它应用于实战,例如 tio-websocket-server,tio-http-server等。但是由于上述两个server已经封装好,直接应用就可以。所以对于整个数据流通的过程不是很明朗,甚至对于hello-world例子中的encode,decode作用并不理解。于是乎想写一个更贴近实际应用的redis-client来作为学习切入点,虽然编码过程中困难重重,不过最后还是实现了一个粗糙的客户端。由于代码中大量参考了Jedis源码,所以,我给这个客户端起名T-io+Redis=Tedis.哈哈,这些都不重要,下文中将会记录出我的学习和开发历程。

Redis通信协议

  Redis Protocol

  在开发之前,首先要去了解客户端和服务端的通信协议,那么我们开发Redis客户端,就要去看看Redis协议了。所以,下面要做的就是:

  • 明确客户端发送给服务端的消息格式//SET命令 set mykey myvalue //GET命令 get mykey上述两个简单的命令,根据Redis协议可以解析成如下内容//SET命令 *3\r\n$3\r\nset\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n //GET命令 *2\r\n$3\r\nget\r\n$5\r\nmykey\r\n其中 *3代表有三段内容,即 SET,mykey,myvalue.每一段内容之间由 CRLF(\r\n)隔开.$符号后边跟的数字就是数据字节数。引用官方的一个图:
    Jedis源码中,对于消息体的构造比较麻烦,我看的也是云里雾里的,所以在Tedis的实现中我才用了最简单的拼接方式。即StringBuilder根据规则拼接字符串,然后调用getBytes方法获取byte[]。示例代码如下:public static byte[] buildCommandBody(final ProtocolCommand cmd,String... args) { StringBuilder builder = new StringBuilder(); //*[num] builder.append('*') //命令数(1) + 参数的个数 .append(1 + args.length); appendCrLf(builder) //命令长度 $[cmd_length] .append("$") .append(cmd.getName().length()); appendCrLf(builder) //命令内容 cmd .append(cmd.getName()); appendCrLf(builder); //遍历参数,按照 $[num]\r\n[content]\r\n的格式拼接 for (String arg : args) { builder.append("$") .append(arg.length()); appendCrLf(builder) .append(arg); appendCrLf(builder); } //最后转换为 byte[],此处使用 Jedis 中的 SafeEncoder return SafeEncoder.encode(builder.toString()); }调用示例: public static void main(String[] args){ Protocol.buildCommandBody(Protocol.Command.SET,"key","value"); }打印结果:*3 $3 SET $3 key $5 value  那么到此为止,我们已经了解了如何构造发送给服务端的消息,那么如何解析服务端返回的消息呢? Redis 命令会返回多种不同类型的回复。 通过检查服务器发回数据的第一个字节, 可以确定这个回复是什么类型:
  • 明确服务端返回给客户端的消息格式 在此呢,我只简单举一个 GET,SET的例子,其他的内容大家可以去看参考文档。
  • 状态回复(status reply)的第一个字节是 "+"
  • 错误回复(error reply)的第一个字节是 "-"
  • 整数回复(integer reply)的第一个字节是 ":"
  • 批量回复(bulk reply)的第一个字节是 "$"
  • 多条批量回复(multi bulk reply)的第一个字节是 "*"   时间有限,我也只是完成了状态回复和批量回复的部分功能,下文中将以这两种回复作为讲解示例。

T-io登场

  由于只是客户端的开发,所以这里我们只会用到TioClient。所以,我们先把Redis-Server连接上。ClientAioHandler,ClientAioListener,ClientGroupContext自然是少不了的啦,直接上代码吧。

  • 初始化一个 ServerNode Node serverNode = new Node("127.0.0.1",6379);
  • 初始化一个ClientGroupContext,它依赖于ClientAioHandler,ClientAioListener
代码语言:txt
复制
 ClientGroupContext clientGroupContext = new ClientGroupContext(tioClientHandler, aioListener, null);
  • 初始化一个TioClient TioClient tioClient = new TioClient(clientGroupContext);
  • 最后连接服务器,如果没有什么异常打印的话,就连接成功啦//返回的ClientChannelContext 用于发送消息使用 ClientChannelContext clientChannelContext = tioClient.connect(serverNode);  恭喜你,一个Redis客户端宝宝就此诞生,只不过它还不会说话。结合上文协议部分的内容,我们发送一条消息给服务器。首先定义消息包:public class TedisPacket extends Packet { private byte[] body; //getter setter }然后调用Tio.send方法就可以啦。 Tio.send(clientChannelContext, packet);如果你已经看懂了上半部分,那么你就会知道这里 TedisPacket中的body的值就是通过Protocol.buildCommandBody(Protocol.Command.SET,"key","value");来生成的。不要忘了 `ClientAioHandler.encode’方法哦。 @Override public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) { TedisPacket tedisPacket = (TedisPacket) packet; byte[] body = tedisPacket.getBody(); int bodyLen = 0; if (body != null) { bodyLen = body.length; } //只是简单将 body 放入 ByteBuffer 。 ByteBuffer buffer = ByteBuffer.allocate(bodyLen); buffer.put(body); return buffer; }到此为止,客户端向服务器发送消息的内容已经写完了。下面将介绍如何解析服务端的响应。

  当服务器正常,并且发送到服务器的消息格式符合RESP协议的话,那么服务器会返回你相应的内容,比如我们发送SET命令,服务器的正常响应是+OK\r\n.下面我们看ClientAioHandler.decode方法。当我批量向服务器发送消息时,服务器给我的响应也是批量接收到的。打印结果如下:

那么问题来了,我们只想要每一次发送对应一个OK.所以,原谅我这个菜鸟,我才明白decode方法的目的。那么,我们就去解析这个内容。解析过程有几个需要关注的地方:

  • 遇到第一个 \r的时候,下一个字节一定是'\n'否则,作为解析失败处理。byte first = buffer.get();以 +OK\r\n举例: private TedisPacket readSingleLinePacket(ByteBuffer buffer,int limit,int position) throws AioDecodeException { byte[] body = new byte[limit - position]; int i = 0; //结束标志 boolean endFlag = false; while (buffer.position() <= limit) { byte b = buffer.get(); //如果是\r if (BufferReader.isCr(b)) { byte c = buffer.get(); //如果不是\n抛出异常 if (!BufferReader.isLf(c)) { throw new AioDecodeException("unexpected redis server response"); } //结束解析 endFlag = true; break; } else { body[i++] = b; } } //如果此次解析一直没有遇到\r\n,则返回null,等待下次解析 if (!endFlag) { return null; } TedisPacket packet = new TedisPacket(); packet.setBody(body); return packet; }写完解析代码之后,再一次调试结果如下,可以看到数据以5个字节减少,说明数据包被正确解析了。打印内容来自Tio:DecodeRunnable.java.
    到此为止,我们完成了消息的发送和接收,但是问题来了,由于消息是异步接收,那我们如何才能让客户端知道命令调用是否成功呢?<font color=#A52A2A>注意,下文中的内容仅为个人理解,错误之处恳请指正</font> 既然redis是单线程处理的,那么我是否可以理解为,消息的处理就是先到先处理,后到后处理呢?所以,我的解决方式是通过 LinkedBlockingQueue。当解析完一个包之后,将这个包放入阻塞队列中。 @Override public void handler(Packet packet, ChannelContext channelContext) throws Exception { TedisPacket responsePacket = (TedisPacket) packet; if (responsePacket != null) { QueueFactory.get(clientName).put(responsePacket); } }同步接收返回消息: private String getReponse() { for (; ; ) { try { TedisPacket packet = QueueFactory.get(clientName).take(); return packet.hasBody() ? SafeEncoder.encode(packet.getBody()) : null; } catch (InterruptedException e) { e.printStackTrace(); return null; } } }所以set代码就变成这样: @Override public String set(String key, String value) { client.set(key,value); return client.getStatusCodeReply(); }OK,消息接收这块是基于我的理解,我也不知道对不对,而且,其中的BUG肯定也是多的数不胜数,没关系,抱着学习的心态慢慢去完善就好了。Jedis也不是一次两次就写成的对吧。Tedis 与 Jedis  在开发过程中,我阅读了很多Jedis的源代码,大体思路能看懂,可是很多细节处理对我来说就比较难了,大神的代码只可膜拜。不过也给了我很多启发。最后不知天高地厚的和人家做一下对比吧。 public static void main(String[] args) { Jedis tedis = new Jedis("192.168.1.225", 6379); long start = SystemTimer.currentTimeMillis(); for (int i = 0; i < 200; i++) { tedis.set("tedis", "tedis"); } tedis.get("tedis"); long end = SystemTimer.currentTimeMillis(); System.out.println("总共用时:" + (end - start) + "ms,平均用时:" + ((end - start) / 100) + "ms"); }Jedis结果:总共用时:262ms,平均用时:2ms Tedis结果:总共用时:390ms,平均用时:3ms
  • \r\n之后停止本轮解析,返回解析结果。 基于上述注意事项,解析代码如下:(应该会有更优秀的方法) 先获取第一个字节,它应该是 + - $ : *的其中一个,如果不是的话,说明消息可能是上一次不完整导致的,等待下次解析。

那么这一毫秒差在哪里呢?

总结

  一篇博客简单介绍了Redis客户端的开发过程,当然对于成熟的客户端Jedis来说,也就是一个HelloWorld,不过这有什么关系呢?知其然,更要知其所以然。看了大神的代码才知道自己有多渺小哦。继续加油~~

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Redis通信协议
  • T-io登场
  • 总结
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档