前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python 分隔列队机制完美解决TCP粘包\分包问题

python 分隔列队机制完美解决TCP粘包\分包问题

作者头像
zmh-program
发布2023-02-06 10:00:40
4130
发布2023-02-06 10:00:40
举报
文章被收录于专栏:信息技术博客信息技术博客

通常, TCP接收为210字节(1024 bytes, 213bits), 包括了4 字节的消息头和 1020字节的消息. 那如果超出这个范围呢? 分多次发, python提供了一个接口python.socket.sendall(bytes)。 在套接字中, 由于TCP的优化 Nagle算法机制或者接受最大值(MSS) < 应接收的值,出现粘包,分包现象 将多次间隔较小、数据量较小的数据,合并成一个数据量大的数据块,然后进行封包。那么这样一来,接收端就必须使用高效科学的拆包机制来分辨这些数据。 (如图1)

请添加图片描述
请添加图片描述

解决此问题, 可以将发送的内容转换repr (‘something\n’ -> ‘something\n"’), 并添加分隔符. 解析的时候, 通过分割分隔符, 组成列队Queue, 先出后进. 如果其中有分隔符, 那么其中的内容进入ReadyQueue,如果分割最后无分隔符, 则进入等待WaitKey, 等下次分割出的第一个相结合, 进入ReadyQueue. (如图2)

情况一 接收 "data1\n" 这是一个完整的数据包, 分割出来["data1", ""] 会将"data1"与前面waitKey将(初始化为"")结合进入列队. 并将最后面的""设为waitKey

情况二 接收ata2 这是分包导致的,分割得出["ata2"]将其与前面waitKey结合,不进入列队,等待分隔符。将下一次含有分隔符前端的数据结合进入列队。

情况三 接收\ndata3\ndata4\nd 这是粘包导致的后面还加带了一点点数据 分割得出["", "data3", "data4", "d"]

前面的waitKey与第一个空字符结合,进入列队 带data3与data4进入列队 waitKey设为第四个"d" .

请添加图片描述
请添加图片描述

怎么用python解决呢?

导入库 socket, threading

文章目录

导入

代码语言:javascript
复制
import socket
from threading import Thread

| SocketHandler类

代码语言:javascript
复制
class SocketHandler(object): #由于 accept为被动接受, 所以不继承 socket.socket
	split_text = "\n"  # 类变量, 默认分隔符为回车(\n)
·初始化
代码语言:javascript
复制
def to_thread(target, Daemon=True) -> callable:
    def run(*args, name=str()) -> Thread:
        thread = Thread(target=target, args=args)
        thread.setDaemon(Daemon)
        if name:
            thread.setName(name)
        return thread

    return run

class SocketHandler(object):
# ...
    def __init__(self, socket=socket.socket(), bufsize=1024, codec="utf8", run=True):
        self.socket, self.bufsize, self.codec = socket, bufsize, codec
        self.waitKey = str()
        self.ReadyQueue = []
        self._closed = False
        if run:
            self.run()

    def run(self):
        self.forever_receive(name=f"客户端{socket}").start()

    @to_thread
    def forever_receive(self) -> (str, None):
        while self.isOpen():
            data = self.__recv()
            if isinstance(data, bytes) and data:
                self.parse_data(self.handle(data))
                continue
            elif data is ConnectionError:
                return
·接收以及异常处理
代码语言:javascript
复制
def ignore(function):
    def func(*args, **kwargs):
        try:
            return function(*args, **kwargs)
        except:
            pass

    return func

class SocketHandler(object):
# ...
    def __del__(self):
        self.quit()

    def isOpen(self) -> bool:
        return not (self._closed and getattr(self.socket, "_closed", False))

    def quitEvent(self) -> None:
        pass

    def quit(self) -> None:
        self._closed = True
        self.quitEvent()
        self.socket.close()

    def __recv(self) -> (bytes, ConnectionError):
        try:
            return self.socket.recv(self.bufsize).strip(b" ")  # str.strip()不可用! 会将\n省略
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return ConnectionError

    def __send(self, data: bytes) -> bool:
        try:
            self.socket.sendall(data)
            return True
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return False
    def send(self, data) -> bool:
        if isinstance(data, str):
            data = data.encode(self.codec)
        elif isinstance(data, (set, list, tuple)):
            data = repr(data)
        elif isinstance(data, (int, float)):
            data = str(data).encode(self.codec)
        elif isinstance(data, bytes):
            pass
        else:
            data = bytes(data)
        return self.__send(data + self.split_text.encode(self.codec))

    @ignore
    def connect(self, host: str, port: int):
        assert 0 <= port <= (2 ** 16) - 1
        self.socket.connect((host, port))

△ 解析(重点)

代码语言:javascript
复制
    def handle(self, data: bytes):
        return [d.strip() for d in data.decode(self.codec).split(self.split_text)]

    @ignore  # assert bool(list)
    def parse_data(self, generator: (tuple, list, set)) -> None:
        generator = list(generator)
        if len(generator) == 1:  # 列表为1, 表明无间隔符, 则在等待中添加. 
            self.waitKey += generator[0]
            return
        self.ReadyQueue.append(self.waitKey + generator.pop(0)) #将原先的等待值
        self.waitKey = generator.pop()
        self.ReadyQueue.extend(generator)

    def recv(self) -> str:
        while not self.ReadyQueue:
            pass
        return self.ReadyQueue.pop(0)

    def recv_list(self) -> list:
        queue = self.ReadyQueue[:]
        self.ReadyQueue = []
        return queue

| 测试

代码语言:javascript
复制
import time


class Debugger:
    addr = ("127.0.0.1", 429)

    s = socket.socket()
    s.bind(addr)
    s.listen(10)
    user = SocketHandler(bufsize=20, run=False)
    user.connect(*addr)
    server = SocketHandler(s.accept()[0], run=False)
    user.run()

    def __init__(self):
        self.IO()

    def IO(self):
        while True:
            self.server.send(time.time())
            self.server.send(time.time())
            print(self.user.recv_list())
            time.sleep(3)


debug = Debugger()
debug.IO()

| 封装

代码语言:javascript
复制
import socket


def ignore(function):
    def func(*args, **kwargs):
        try:
            return function(*args, **kwargs)
        except:
            pass

    return func


class SocketHandler(object):
    split_text = "\n"  # 类变量, 默认分隔符为回车(\n)

    def __init__(self, socket=socket.socket(), bufsize=1024, codec="utf8", run=True):
        self.socket, self.bufsize, self.codec = socket, bufsize, codec
        self.waitKey = str()
        self.ReadyQueue = []
        self._closed = False
        if run:
            self.forever_receive()

    def __del__(self):
        self.quit()

    def isOpen(self) -> bool:
        return not (self._closed and getattr(self.socket, "_closed", False))

    def quitEvent(self) -> None:
        pass

    def quit(self) -> None:
        self._closed = True
        self.quitEvent()
        self.socket.close()

    def __recv(self) -> (bytes, ConnectionError):
        try:
            return self.socket.recv(self.bufsize).strip(b" ")  # str.strip()不可用! 会将\n省略
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return ConnectionError

    def __send(self, data: bytes) -> bool:
        try:
            self.socket.sendall(data)
            return True
        except (ConnectionAbortedError, ConnectionRefusedError, ConnectionResetError, OSError) as e:
            self.quit()
            return False

    def send(self, data) -> bool:
        if isinstance(data, str):
            data = data.encode(self.codec)
        elif isinstance(data, (set, list, tuple)):
            data = repr(data)
        elif isinstance(data, (int, float)):
            data = str(data).encode(self.codec)
        elif isinstance(data, bytes):
            pass
        else:
            data = bytes(data)
        return self.__send(data + self.split_text.encode(self.codec))

    def forever_receive(self) -> (str, None):
        while self.isOpen():
            self.receive_datas()
            
    def receive_datas(self) -> bool:
        data = self.__recv()
        if isinstance(data, bytes) and data:
            self.parse_data(self.handle(data))
            return True
        elif data is ConnectionError:
            return False

    def handle(self, data: bytes):
        return [d.strip() for d in data.decode(self.codec).split(self.split_text)]

    @ignore  
    def parse_data(self, generator: (tuple, list, set)) -> None:
        generator = list(generator)
        if len(generator) == 1:  # 列表为1, 表明无间隔符, 则在等待中添加.
            self.waitKey += generator[0]
            return
        self.ReadyQueue.append(self.waitKey + generator.pop(0))
        self.waitKey = generator.pop()
        self.ReadyQueue.extend(generator)

    def recv(self) -> str:
        while not self.ReadyQueue:
            self.receive_datas()
        return self.ReadyQueue.pop(0)

    def recv_list(self) -> list:
        queue = self.ReadyQueue[:]
        self.ReadyQueue = []
        return queue

    @ignore
    def connect(self, host: str, port: int):
        assert 0 <= port <= (2 ** 16) - 1
        self.socket.connect((host, port)) 
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-05-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 导入
  • | SocketHandler类
    • △ 解析(重点)
    • | 测试
    • | 封装
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档