跨平台PHP调试器设计及使用方法——通信

        首先引用《跨平台PHP调试器设计及使用方法——探索和设计》中的结构图(转载请指明出于breaksoftware的csdn博客)

        本文要介绍的是我们逻辑和pydbgp通信的实现(图中红框内内容)。

        设计通信之前,我需要先设计一种通信协议,其实就是一个数据打包和解包的协议。因为我们的数据非常简单,所以只是用“”长度+数据“”的结构。我们规定一个包的前8个字节表示数据的总长度(包括这个8个字节的长度),然后跟着的就是数据。

class socket_protocol:
    def __init__(self):
        self.response = ""
        pass
    
    def pack_request(self, request):
        request_len = len(request) + 8
        package = '{:0>8}'.format(request_len)
        package += request
        return package
    
    def input_response(self, data):
        self.response += data
        
    def data_valid(self):
        if len(self.response) < 8:
            return False
        length = self.response[:8]
        if int(length) == len(self.response):
            return True
        else:
            return False
        
    def clear(self):
        self.response = ""
        
    def get_response(self):
        if False == self.data_valid():
            return ""
        return self.response[8:]

        pack_request用于将数据打包组装;input_response是为了让数据接收方可以一直接收数据(因为不是每次调用input_response就可以把所有数据都读取,这在数据量很大是比较常见);data_valid用于检测接收到的数据是否已经接收完毕,因为数据接收并非一次性完成,所以我们需要一个逻辑判断是否还需要接收数据。因为我们的数据有严密的结构,我们可以通过接收的数据长度判断数据是否接收完毕。get_response则是在数据接收完毕后,接收方调用获取完整的数据。         因为服务端和客户端都存在数据打包发送和解包的工作,所以socket_protocol将是整个通信数据的基础类。

        我们再看下稍微简单点的服务器代码

import os
import time
import socket
import threading
from socket_protocol import socket_protocol

def deal_data(data):
    send_data = "recv" + data
    return send_data

class socket_server:
    def __init__(self, deal_func = None):
        self._stop_event = threading.Event()
        self._communicate_thread = None
        if deal_func:
            self.deal_func = deal_func
        else:
            self.deal_func = deal_data
        pass
    
    def __del__(self):
        self.Stop()
        pass
    
    def Start(self):
        if self._communicate_thread:
            return
        self._communicate_thread = threading.Thread(target=self._worker)
        self._communicate_thread.start()
        
    def _worker(self):
        self._stop_event.clear()
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(("localhost", 9001))
        s.listen(1)
        con,addr = s.accept()
        s_data = socket_protocol()
        while False == self._stop_event.wait(0.1):
            data = con.recv(1024)
            s_data.input_response(data)
            if s_data.data_valid():
                send_data = self.deal_func(s_data.get_response())
                con.sendall(s_data.pack_request(send_data))
                s_data.clear()
        con.close()
        
    def Stop(self):
        self._stop_event.set()
        while self._communicate_thread.is_alive():
            time.sleep(0.01)
        self._communicate_thread = None

        socket_server类在构造函数中暴露了一个参数,用于指定处理接收到数据的函数入口地址。这样我们就让服务器通信这块逻辑和数据处理业务解耦。而全局deal_data方法,则是在用户没有传入处理数据的函数指针时的一个替代品,它没有任何作用,只是为了保证代码的严谨性。在Start函数中,我们启动了一个用于接收和处理数据的线程。相应的Stop方法则是终止该线程执行。这个类的核心是线程函数_worker的实现。它在本地绑定了9001端口,然后不停的从该端口读取数据。如果协议类socket_protocol的对象判断本次读取数据已经完毕,它就会调用构造函数中传入的方法处理获取的数据,然后将该方法返回的数据打包后发给请求方。         客户端的实现则稍微复杂点

import os
import time
import socket
import threading
from socket_protocol import socket_protocol

class socket_client:
    def __init__(self):
        self._response_ready = threading.Event()
        self._stop_event = threading.Event()
        self._lock_excute = threading.Lock()
        self._communicate_thread = None
        self._cmd = ""
        self._result = ""
        pass
    
    def __del__(self):
        self.Stop()
        pass
    
    def Query(self,cmd):
        self._lock_excute.acquire()
        self._cmd = cmd
        self._response_ready.clear()
        self._response_ready.wait()
        result = self._result
        self._lock_excute.release()
        return result
    
    def _worker(self):
        self._stop_event.clear()
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect(("localhost", 9001))
        s_data = socket_protocol()
        while False == self._stop_event.wait(0.1):
            query = ""
            if len(self._cmd) == 0:
                continue
           
            query = self._cmd 
            self._cmd = ""
            
            send_data = s_data.pack_request(query)
            s.sendall(send_data)
            while False == self._stop_event.wait(0.1):
                data = s.recv(1024)
                s_data.input_response(data)
                if s_data.data_valid():
                    self._result = s_data.get_response()
                    s_data.clear()
                    self._response_ready.set()
                    break
        s.close()

    def Start(self):
        if self._communicate_thread:
            return
        self._communicate_thread = threading.Thread(target=self._worker)
        self._communicate_thread.start()
        
    def Stop(self):
        self._stop_event.set()
        while self._communicate_thread.is_alive():
            time.sleep(0.01)
        self._communicate_thread = None

        Start和Stop函数分别用于启动和关闭一个发送请求接收数据的线程。线程函数_worker也和socket_server中类似,只是它要不停判断self._cmd是否有数据,也就是是否有请求进来。如果有请求进来,它就将该请求通过socket_protocol打包发送给服务端,然后在从服务端取回结果并解包,把解包后的结果放入self._result中。通过self._response_ready.set()设置事件告知业务方请求完毕,可以来拿结果了。Query函数就是业务方调用的入口,它使用锁操作保证每次只能有一次查询行为。然后通过设置self._cmd告知线程要向服务器发送该指令,然后通过等待线程设置self._response_ready事件来等待请求返回,并把结果返回给调用方。客户端设计的比较复杂的一个重要原因是我们这个模型要求请求是有序的。因为在测试过程中,我发现pydbgp是非常脆弱的,往往因为一些不合常理的查询顺序导致整个程序都死掉。所以我把“有序”的特性设计在了客户端基础类中。

        看完基础类,我们再来看看我们需要控制的pydbgp是怎么被调用的。在《跨平台PHP调试器设计及使用方法——探索和设计》一文中,我说明过我只是想把pydbgp当成一个工具来使用,而尽量不要对其源码有任何改动——除非有bug。因为pydbgp不能像API一样使用,所以我只能模拟标准输入输出来达到和它的交互。而如果标准输入输出被改变,将影响整个程序,所以为了避开这种设计对我们自己的代码及其他第三方库的影响。我们需要将pydbgp作为一个独立的进程来执行。        我们需要重定向标准输入和输出,于是我设计了一个重定向之后的输入类input_redirection,其核心的就两个函数

    def readlines(self, size=-1):
        while len(self._data) == 0:
            time.sleep(0.01)
        self._lock_excute.acquire()
        ret_data = self._data
        self._data = ""
        self._lock_excute.release()
        return ret_data
    
    def write(self, data):
        self._lock_excute.acquire()
        logging.debug("reqeust: " + data)
        self._data = data
        self._lock_excute.release()

        write函数用于从服务器中接收请求的内容,然后重定向之后的输入通过readlines读取内容。从而达到模拟请求的目的。

        而重定向标准输出类则相对复杂点,因为它要牵扯到数据的内容。pydbgp在调试过程中分为两种状态,一种是调试某个session的阶段,就是下图中4的过程,以后我们称该阶段为session阶段;另外一种是不调试任何session的阶段,即除去4之外的其他阶段,之后我们称该阶段为no_session阶段。 因为pydbgp比较脆弱,不能在不同阶段调用另一个阶段的命令,轻则告知出错,重则整个程序都死掉。所以我们必须在执行每条指令后判断其所处于的阶段,而这种判断规则则和其返回的数据特征有关。

        而作为命令发起方,在发起一个命令后,可以获取命令执行的结果。而对于当前pydbgp处于什么阶段,则也需要知道,否则不能保证发送的下条命令会不会把pydbgp搞挂掉。所以我就在返回结果中加入一些特征,使得命令发起方可以得知指令执行后的调试器阶段信息。具体的做法就是在数据后加入特征码,这个逻辑是在_send_data中实现的。

class output_redirction:
    _out = None
    _query_event = None
    _data = ""
    _response = ""
    def __init__(self, query_event):
        self._out = sys.stdout
        self._query_event = query_event

    def write(self, output_stream):
        if re.match('^\[dbgp-', output_stream):
            self._send_data(True)
        elif re.match('^\[dbgp\]', output_stream):
            self._send_data(False)
        else:
            self._data += output_stream
        
    def flush(self):
        pass
        
    def _send_data(self, is_seesion):
        if (is_seesion):
            end_ch = "@\n"
        else:
            end_ch = ":\n"
            
        data = base64.b64encode(self._data) + end_ch
        logging.debug("response:" + self._data + end_ch)
        self._response = data
        self._query_event.set()
        
        self._out.write(data)
        self._data = "" 
        self._out.flush()
        
    def get_reponse(self):
        return self._response

        另一个问题就是我们如何判断当前pydbgp所处的阶段。我们发现如果处在session阶段,则返回的数据是以“[dbgp-”开头的;如果是no_session阶段,则是“[dbgp]”开头的。利用这个特征,我们在write函数中分析出所处阶段,并告知_send_data发送什么样的数据。         剩下的工作便是让整个程序的标准输入和输出被重定向,还有就是启动通信的服务端。

query_event = threading.Event()
out_r = output_redirction(query_event)
in_r = input_redirection()
sys.stdin = in_r
sys.stdout = out_r
sys.stderr = out_r

def Query(cmd):
    logging.debug("query " + cmd)
    query_event.clear()
    in_r.write(cmd)
    query_event.wait()
    return out_r.get_reponse()

if __name__ == "__main__":
    cmd_server = socket_server(Query)
    cmd_server.Start()
    sys.exit(main([0]))

        和服务端相对应的,则存在一个与其交互的客户端。它便是本文最开始结构图中的pydbgpd_stub模块,之所以取名为stub是为了让调用pydbgp像直接调用一样。在pydbgpd_stub中,它明确了pydbgp处于不同阶段可以调用的不同的命令——分别保存在_session_cmd和_no_session_cmd连个数组中。由于命令比较长,这儿就不列出来了。

    def __init__(self):
        self._exc_cmd = "python pydbgpd_proxy.py"
        self._lock_excute = threading.Lock()
        self._cmd_client = socket_client()
        
    def _is_cmd_valid(self, cmd, cmd_list):
        for item in cmd_list:
            if cmd.startswith(item):
                return True
        return False
        
    def start(self):
        if (self._exc_cmd == None):
            raise NameError("exc_cmd is none")
        if "Windows" == platform.system():
            self._process = subprocess.Popen(self._exc_cmd, shell = False)
        else:
            self._process = subprocess.Popen(self._exc_cmd, shell = True,  preexec_fn = os.setpgrp)
        time.sleep(2)
        self._cmd_client.Start()

        pydbgpd_stub在启动服务器进程时,区分了不同的操作系统。这也是没有办法的事,因为不同系统里,终止子进程和孙子进程的方法不能通用。

    def stop(self):
        self._cmd_client.Stop()
        
        if not self._process:
            raise NameError("subprocess is none")
        else:
            if "Windows" != platform.system():
                pid = self._process.pid
                pgid = os.getpgid(pid)
                os.kill(-pgid, 9)
            self._process.terminate()
            self._process.kill()
            self._process = None

        接下来我们要看其暴露的最主要的两个方法

    def is_session(self):
        self._lock_excute.acquire()
        is_session = self._is_session
        self._lock_excute.release()
        return is_session
    
    def query(self, query_cmd):
        data = ""
        if self._is_session:
            if not self._is_cmd_valid(query_cmd, self._session_cmd):
                return "invalid cmd"
        else:
            if not self._is_cmd_valid(query_cmd, self._no_session_cmd):
                return "invalid cmd"
        
        self._lock_excute.acquire()
        data = self._cmd_client.Query(query_cmd)
        
        if len(data) > 1:
            if data[-2] == "@":
                print "Switch to Session \n"
                self._is_session = True
            elif data[-2] == ":":
                self._is_session = False
                print "Switch to No Session \n"
            data = base64.b64decode(data[:-2])
        
        self._lock_excute.release()
        
        return data  

        is_session方法用于告知调用方当前调试器处在什么阶段(我用的是阶段而非状态,状态我将用于session阶段中调试器的情况描述)。query方法则是请求服务端获取请求结果并更改调试器阶段信息。于是调用方只要调用query方法就可以发起调试命令,就像调用本地方法一样。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券