正文共:3691 字 5 图 预计阅读时间:10 分钟
Adopt the pace of nature, her secret is patience.
请接受大自然的节奏,她的秘诀就是耐心。
小闫语录:
耐心,静心,以自然的节奏对待生活,以自然的节奏处理事情。你会发现一切没有那么糟糕,事事如此平和顺畅。
历史文章导航:
上篇文章我们对消息协议进行了设计,并将其原理解释清楚。今天呢,我们用代码将设计作以实现。
明确好大体思路后,我们开始敲代码了。
1.首先定义一个类:
class DivideProtocol(object):
"""
divide过程消息协议转换工具
"""
2.然后写一个对请求参数转换为消息数据的方法:
代码结合上一篇文章的设计方案进行阅读,会容易的多。
def args_encode(self, num1, num2=1):
"""
将原始的调用请求参数转换打包成二进制消息数据
:param num1: int
:param num2: int
:return: bytes 二进制消息数据
"""
name = 'divide'
# 处理方法的名字 字符串
# 处理字符串的长度
buff = struct.pack('!I', 6)
# 处理字符
buff += name.encode()
# 处理参数1
# 处理序号
buff2 = struct.pack('!B', 1)
# 处理参数值
buff2 += struct.pack('!i', num1)
# 处理参数2
if num2 != 1:
# 处理序号
buff2 += struct.pack('!B', 2)
buff2 += struct.pack('!i', num2)
# 处理消息长度,边界设定
length = len(buff2)
buff += struct.pack('!I', length)
buff += buff2
return buff
i:代表的是有符号整数类型用4个字节进行表示。因为参数是有正负的。 B:代表的是整数类型用1个字节进行表示。
接下来我们编写一个方法,用以对请求消息数据的解码操作。
在进行解码操作之前,思考一个问题,就是方法接收的参数是什么?有人会说,这还不容易吗,直接将全部数据接收然后解析,传递一个data即可。可以是可以,但是不方便,因为我们传递的数据大小未知,全部解析完成之后,里面都进行了解码操作,消息数据原先的格式被打乱,无法确定每一个消息的边界,所以需要边解析边确定。直到读取完方法名之后,继续读取边界长度才变成已知。
既然需要边读取边解析,那么就需要一个读取数据的来源。因为我们的RPC建立在网络通讯基础之上,毕竟是远程调用,对吧?所以我们就可以从网络数据中读取数据。在TCP中,我们可以利用socket进行相关操作,定义connection,建立一个网络连接通道,边读取数据边进行解析。
还有一个问题,因为底层TCP的socket在封装完工具类之后,我们还未实现,但是又想测试此工具类,为了方便,需要有一个读取二进制的东西,在此我们引入BytesIO对象,来往外读取二进制数据。此处我们先书写参数解码操作的方法:
def _read_all(self, size):
"""
帮助我们读取二进制数据
:param size: 想要读取的二进制数据大小
:return: 二进制数据 bytes
"""
# self.conn
pass
def args_decode(self, connection):
"""
接收调用请求消息数据并进行解析
:param connection: 连接对象 socket BytesIO
:return: dict 包含了解析之后的参数
"""
# 参数长度映射关系
param_len_map = {
1: 4,
2: 4
}
# 参数格式映射关系
param_fmt_map = {
1: '!i',
2: '!i'
}
# 参数名字映射关系
param_name_map = {
1: 'num1',
2: 'num2'
}
# 保存用来返回参数的字典
# args = {"num1": xxx, "num2": xxx}
args = {}
self.conn = connection
# 处理方法的名字已经提前被处理
# 后面我们会实现一个方法专门处理不同的调用请求的方法名解析
# 处理消息边界
# 读取二进制数据
# socket.recv(4) => ?4 判断读取的数据是否为4个,直到4个字节我们才进行处理
# BytesIO.read
buff = self._read_all(4)
# 将二进制数据转换为python的数据类型
length = struct.unpack('!I', buff)[0]
# 已经读取处理的字节数
have = 0
# 处理第一个参数
# 1.处理参数序号
buff = self._read_all(1)
have += 1
param_seq = struct.unpack('!B', buff)[0]
# 2.处理参数值
param_len = param_len_map[param_seq]
buff = self._read_all(param_len)
have += param_len
param_fmt = param_fmt_map[param_seq]
param = struct.unpack(param_fmt, buff)[0]
param_name = param_name_map[param_seq]
args[param_name] = param
if have >= length:
return args
# 处理第二个参数
# 1.处理参数序号
buff = self._read_all(1)
param_seq = struct.unpack('!B', buff)[0]
# 2.处理参数值
param_len = param_len_map[param_seq]
buff = self._read_all(param_len)
param_fmt = param_fmt_map[param_seq]
param = struct.unpack(param_fmt, buff)[0]
param_name = param_name_map[param_seq]
args[param_name] = param
return args
由于 self.conn
可能有不同的类型(可能是socket类型,可能是bytes类型),因此我们需要根据不同的类型按照不同的方法进行读取操作。
from io import BytesIO
def _read_all(self, size):
"""
帮助我们读取二进制数据
:param size: 想要读取的二进制数据大小
:return: 二进制数据 bytes
"""
# self.conn
# 读取二进制数据
# socket.recv(4) => ?4 判断读取的数据是否为4个,直到4个字节我们才进行处理
# BytesIO.read
if isinstance(self.conn, BytesIO):
# 只涉及到本地操作,未涉及网络,不需要特殊处理,因为测试代码需要,此处才进行引用
buff = self.conn.read(size)
return buff
else:
# socket类型数据如何处理
# 因涉及到网络,获取到的数据未必是所需大小,所以需要判断
have = 0
buff = b''
while have < size:
chunk = self.conn.recv(size - have)
buff += chunk
l = len(chunk)
have += l
if l == 0:
# 表示客户端socket关闭了
raise EOFError()
return buff
还记得在实现请求消息转换为原始数据的方法中我们留的一个坑吗?现在将其填上。因为解析方法名的方法是通用的,根据解析出对应的方法名再执行对应的调用过程,即对应的协议。所以此方法是独立于 DivideProtocol
之外的一个类。我们来定义一下:
class MethodProtocol(object):
"""
解读方法名字
"""
def __init__(self, connection):
self.conn = connection
def _read_all(self, size):
"""此处方法同3.read_all方法中实现的代码,因此不再重复书写"""
# 当然,如果你愿意,直接用类的继承也未尝不可
......
def get_method_name(self):
"""
提供方法名
:return: str 方法名
"""
# 1.读取字符串长度
buff = self._read_all(4)
length = struct.unpack('!I', buff)[0]
# 2.读取字符串
buff = self._read_all(length)
name = buff.decode()
return name
优质文章推荐: