前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RPC(六)

RPC(六)

作者头像
小闫同学啊
发布2019-07-18 15:46:54
1.5K0
发布2019-07-18 15:46:54
举报
文章被收录于专栏:小闫笔记

正文共:5455 字 5 图 预计阅读时间:14 分钟

每日分享

Nothing is so fatiguing as the eternal hanging on of an uncompleted task.

没有哪一件事情比藏在心里的一件无法完成的事情更劳累的。

小闫语录

心头积攒的琐事是阻碍你前进的罪魁祸首,尽管有些事、有些人难以释怀更难放下,但是你的心又有谁懂?放过自己,成全自己,做更好的自己。

RPC

GitHub地址:

代码语言:javascript
复制
https://github.com/EthanYan6/rpc_divide.git

为了方便大家查看,我将RPC相关的代码放在了GitHub上面,大家可以clone到本地进行查看。

历史文章导航:

RPC(一)

RPC(二)

RPC(三)

RPC(四)

RPC(五)

咱们前面已经将RPC消息数据构造好了,接下来呢,就可以通过网络在调用双方进行传递了。传递的方式常用的有两种,一种是TCP(传输控制协议),一种是HTTP。下面我们会逐一进行讲解。

1.RPC传输协议说明

1.1TCP

由于它的可靠性,TCP成为了最常用的方式。而且我们可以直接借助socket工具进行TCP开发。下面我们回顾一些TCP编程的相关知识:

通讯过程:客户端和服务端建立连接,期间涉及到三次握手和四次挥手。

TCP服务端编写

代码语言:javascript
复制
sock = socket.socket() # 创建一个套接字
sock.bind() # 绑定端口
sock.listen() # 监听连接
sock.accept() # 接受新连接
sock.close() # 关闭服务器套接字

TCP客户端编写:

代码语言:javascript
复制
sock = socket.socket() # 创建一个套接字
sock.connect() # 连接远程服务器
sock.recv() # 接收数据
sock.send() # 数据尽可能的发送
sock.sendall() # 数据完全发送
sock.close() # 关闭连接
1.2HTTP

大家应该有点疑惑,TCP处于传输层还好理解,可以用来实现RPC传输。HTTP可是在应用层的协议啊,用它来传输是不是有点大材小用,杀鸡用宰牛刀啊?暂且不考虑这个问题,我们只考虑能否实现的问题,HTTP基于TCP实现,而且HTTP协议已经实现了TCP的收发,它还是一个公共的标准,各种语言都提供了HTTP实现的工具,我们直接通过一些库使用就好了,最大的优点便在于此--方便。

具体怎么操作呢?我们可以将构造好的RPC消息数据嵌入到HTTP报文中的body部分,而对HTTP的path路径等都无需关心。如下:

代码语言:javascript
复制
HTTP/1.0 POST /
Content-Type: binary
Content-Length: 5096

# 此处放置RPC消息数据

HTTP的通讯效率不如TCP高,所以并不常用。

2.客户端传输工具实现

下面我们实现一个RPC的传输协议,先实现客户端传输工具:

代码语言:javascript
复制
class Channel(object):
    """
    用于客户端建立网络连接
    """
    def __init__(self, host, port):
        """
        :param host: 服务器地址
        :param port: 服务器端口号
        """
        self.host = host
        self.port = port

    def get_connection(self):
        """
        获取连接对象
        :return: 与服务器通讯的socket
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((self.host, self.port))
        return sock

3.服务端传输工具实现

代码语言:javascript
复制
class Server(object):
    """
    RPC服务器
    """
    def __init__(self, host, port, handlers):
        # 创建socket的工具对象
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # 设置socket,重用地址
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        # 绑定地址
        self.host = host
        self.port = port
        sock.bind((self.host, self.port))
        self.sock = sock
        self.handlers = handlers

    def serve(self):
        """
        开启服务器运行,提供RPC服务
        :return:
        """
        # 1.开启服务器的监听,等待客户端的连接请求
        self.sock.listen(128)

        # 2.接受客户端的连接请求
        while True:
            client_sock, client_addr = self.sock.accept()
            print('与客户端%s建立了链接' % str(client_addr))

            # 3.交给ServerStub,完成客户端的具体的RPC调用请求
            stub = ServerStub(client_sock, self.handlers)
            try:
                while True:
                    stub.process()
            except EOFError:
                # 表示客户端关闭了连接:
                print('客户端关闭了连接')
                client_sock.close()

4.ClientStub实现

代码语言:javascript
复制
class ClientStub(object):
    """
    用来帮助客户端完成远程过程调用 RPC调用

    stub = ClientStub()
    stub.divide(200, 100)
    框架是通用的,如果想实现加法,stub.add()类似
    """
    def __init__(self, channel):
        self.channel = channel
        self.conn = self.channel.get_connection()

    # 提供一个方法供客户端进行调用
    def divide(self, num1, num2=1):
        # 将调用的参数打包成消息协议的数据
        proto = DivideProtocol()
        args = proto.args_encode(num1, num2)

        # 将消息数据通过网络发送给服务器
        self.conn.sendall(args)

        # 接收服务器返回的返回值消息数据,并进行解析
        result = proto.result_decode(self.conn)

        # 将结果值(正常float 或 异常InvalidOperation)返回给客户端
        if isinstance(result, float):
            # 正常
            return result
        else:
            # 异常
            raise result

    def add(self):
        pass

5.ServerStub实现

代码语言:javascript
复制
class ServerStub(object):
    """ 
    帮助服务端完成远端过程调用 
    """
    def __init__(self, connection, handlers):
        """
        :param connection: 与客户端的连接
        :param handlers: 真正本地被调用的方法(函数  过程)
        class Handlers:

            @staticmethod
            def divide(num1, num2=1):
                pass

            @staticmethod
            def add():
                pass
        """
        self.conn = connection
        self.method_proto = MethodProtocol(self.conn)
        self.process_map = {
            'divide': self._process_divide
        }
        self.handlers = handlers

    def process(self):
        """
        当服务端接受了一个客户端的连接,建立好连接后,完成远端调用处理
        :return:
        """
        # 1.接收消息数据,并解析方法的名字
        name = self.method_proto.get_method_name()

        # 2.根据解析获得的方法(过程)名,调用响应的过程协议,接收并解析消息数据
        # self.process_map[name]()
        _process = self.process_map[name]
        _process()

    def _process_divide(self):
        """
        处理除法过程调用
        :return:
        """
        # 1.创建用于除法过程调用参数协议数据解析的工具
        proto = DivideProtocol()
        # 2.解析调用参数消息数据
        args = proto.args_decode(self.conn)
        # args = {"num1": xxx, "num2": xxx}

        # 3.进行除法的本地过程调用
        # 将本地调用过程的返回值(包括可能的异常)打包成消息协议数据,通过网络返回给客户端
        try:
            val = self.handlers.divide(**args)
        except InvalidOpreation as e:
            ret_message = proto.result_encode(e)
        else:
            ret_message = proto.result_encode(val)

        self.conn.sendall(ret_message)


    # def _process_add(self):
    #     pass

6.服务器与客户端调用案例实现

我们模拟一下服务器与本地调用。

6.1server

创建一个 server.py文件:

代码语言:javascript
复制
from services import InvalidOpreation
from services import Server

class Handlers:

    @staticmethod
    def divide(num1, num2=1):
        """
        除法
        :param num1: int
        :param num2: int
        :return: float
        """
        # 增加判断操作,抛出自定义异常
        if num2 == 0:
            raise InvalidOpreation()
        val = num1 / num2
        return val

if __name__ == '__main__':
    # 开启服务器
    _server = Server('127.0.0.1', 8000, Handlers)
    _server.serve()
6.2client

创建一个 client.py文件:

代码语言:javascript
复制
from services import ClientStub
from services import Channel
from services import InvalidOpreation


# 创建与服务器的连接
channel = Channel('127.0.0.1', '8000')

# 创建用于RPC调用的工具
stub = ClientStub(channel)

# 进行调用
try:
    val = stub.divide(200,0)
except InvalidOpreation as e:
    print(e.message)
else:
    print(val)

7.多线程RPC服务器实现

上面就将RPC完整实现了,但是案例有个小缺陷,就是每次服务器只能处理一个客户端,一个处理完成之后才开始处理下一个客户端。为了提升服务器的性能呢,我们可以将其做成多线程版本的RPC服务器。

代码语言:javascript
复制
class ThreadServer(object):
    """
    多线程RPC服务器
    """
    def __init__(self, host, port, handlers):
        # 创建socket的工具对象
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        # 设置socket,重用地址
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

        # 绑定地址
        self.host = host
        self.port = port
        sock.bind((self.host, self.port))
        self.sock = sock
        self.handlers = handlers

    def serve(self):
        """
        开启服务器运行,提供RPC服务
        :return:
        """
        # 1.开启服务器的监听,等待客户端的连接请求
        self.sock.listen(128)
        print('服务器开始监听')

        # 2.接受客户端的连接请求
        while True:
            client_sock, client_addr = self.sock.accept()
            print('与客户端%s建立了链接' % str(client_addr))

            # 创建子线程处理这个客户端
            t = threading.Thread(target=self.handle, args=(client_sock,))
            # 开启子线程执行
            t.start()

    def handle(self, client_sock):
        """
        子线程调用的方法,用来处理一个客户端的请求
        :return: 
        """
        # 交给ServerStub,完成客户端的具体的RPC调用请求
        stub = ServerStub(client_sock, self.handlers)
        try:
            while True:
                stub.process()
        except EOFError:
            # 表示客户端关闭了连接:
            print('客户端关闭了连接')
            client_sock.close()

到此为止,RPC的简单实现就完成了。下一次文章将开始讲解分布式RPC的相关内容,尽情期待......

优质文章推荐:

redis操作命令总结

MySQL相关操作

SQL查询语句

前端中那些让你头疼的英文单词

Flask框架重点知识总结回顾

团队开发注意事项

浅谈密码加密

Django框架中的英文单词

Django中数据库的相关操作

DRF框架中的英文单词

DRF框架

Django相关知识点回顾

python技术面试题-腾讯

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-05-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 全栈技术精选 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 每日分享
  • RPC
    • 1.RPC传输协议说明
      • 1.1TCP
      • 1.2HTTP
    • 2.客户端传输工具实现
      • 3.服务端传输工具实现
        • 4.ClientStub实现
          • 5.ServerStub实现
            • 6.服务器与客户端调用案例实现
              • 6.1server
              • 6.2client
            • 7.多线程RPC服务器实现
            相关产品与服务
            云数据库 MySQL
            腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档