前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket+PyQt5)

python TCP套接字服务器v1.1-新增服务端命令功能及修改bug(socket+PyQt5)

作者头像
zmh-program
发布2023-02-06 09:53:08
4580
发布2023-02-06 09:53:08
举报

TCP聊天服务器套接字v1.1

所有版本记录: v1.0 : TCP聊天服务器套接字|PyQt5+socket(TCP端口映射+端口放行)+logging+Thread(含日志,html)+anaconda打包32位exe(3.4万字)|python高阶

文章目录

| 1. 服务器代码改进 / bug改进

(1).发送函数改为@function

class Server():
	...
    def send(self, sock, user, mes):
        self.QUIT(user, lambda: sock.sendall(mes.encode(self.encode)))()
    def send(self, sock, user, mes):
        @self.QUIT(user)
        def func():
            sock.sendall(mes.encode(self.encode))
        return func()

(2).异常运行函数改为三叠函数

    def QUIT(self, user, command):
        sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
        def logs(*args, **kargs):
            try:
                command(*args, **kargs)
            except:
                self.errs.append(sock)
        return logs
    def QUIT(self, user):
        sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
        def prop(command, *args, **kwargs):
            def logs(*args, **kwargs):
                try:
                    command(*args, **kwargs)
                    return True
                except:
                    logging.exception(str())
                    if self.errduring is False:
                        self.errs.append((sock,user)) #在遍历时列表增加 -> 触发RuntimeError
                    return False
            return logs
        return prop

(3).服务端在下线时列表在遍历时 增加下线的服务端 -> 触发RuntimeError

bug改进方法:

增加errduring; inc参数

    def ServerMessage(self, mes):
        for sock, user in self.connect.items():
            if not sock in self.errs:
                self.send(sock, user, mes)
    def UserMessage(self, address, _user, mes):
        if not mes:
            return
        mes = mes.decode(encoding="utf8")
        for sock, user in self.connect.items():
            if not sock in self.errs:
                send_message = Server.user_message % ("brown" if _user == user else "red",
                                                      _user,
                                                      address,
                                                      "(我自己)" if _user == user else "",
                                                      mes)
                self.send(sock, user, send_message)
        logger.info(f"{address}[{_user}] : {mes}")
        self.error_handle()
    def error_handle(self):
        for sock in self.errs:
            sock.close()
            logger.exception(msg = str())
            Q = Server.quit_message % user
            logger.info(Q)
            self.connect.pop(sock)
            self.ServerMessage(Q)
    def ServerMessage(self, mes, inc=True):
        self.errduring = True
        for sock, user in self.connect.items():
            if not sock in self.errs:
                self.send(sock, user, mes)
        if inc:
            self.error_handle()
        self.errduring = False
        
    def UserMessage(self, address, _user, mes,inc=True):
        if not mes:
            return
        self.errduring = True
        for sock, user in self.connect.items():
            if not sock in self.errs:
                send_message = Server.user_message % ("brown" if _user == user else "red",
                                                      _user,
                                                      address,
                                                      "(我自己)" if _user == user else "",
                                                      mes)
                self.send(sock, user, send_message)
        logger.info(f"{address}[{_user}] : {mes}")
        if inc:
            self.error_handle()
        self.errduring = False
        
    def error_handle(self):
        for _, user in self.errs: #使用变量_ : 为了防止触发KeyError: <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
            if user in list(self.connect.values()):
                sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
                logger.exception(msg = str())
                Q = Server.quit_message % (user, self._str_sockets())
                logger.info(Q)
                self.connect.pop(sock) 
                self.ServerMessage(Q, False) #防止多次调用error_handle函数
                sock.close()

(4)获取真正本机的ip地址

如果直接用socket.gethostbyname(socket.gethostname())获取地址,很有可能是错误的(Vmware虚拟机的地址127.0.0.1等)

搜索后我得到了下面这段精巧的代码:

...
def get_host_ip() -> str:
    """get current IP address"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip

通过UDP尝试连接8.8.8.8:80,不管是否连接成功,获得的本机IP一定是正确的。在Ubuntu(kivydev)和Windows上都可用,还省去了判断操作系统的大段代码.

那么代码可以改为:

server = Server(socket.gethostname(),429)
server = Server(get_host_ip(),429)

| 2.新增命令功能 (在输入框添加"/")

class Command_Handler(object):
    def __init__(self, bind=None):
        """Bind Server class"""
        self.bind = bind
    def _function(self, _list, client):
        
        data = {"/info" : {"-v": self.get_version(),
                           "-id" : self.get_id(client),
                           "-i" : self.info(),
                           "-h" : self.help()},
                }
        _dict = data
        for n in range(len(_list)):
            if type(_dict) == dict:
                _dict = _dict.get(_list[n],  self.unknown(" ".join(_list)))
            else:
                break
        if  type(_dict) == dict:
             _dict = "Error:\n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(_dict.keys()) 
        return _dict
    @staticmethod
    def help():
        return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
For example, <font color=red>/info -id</font>"""
    @staticmethod
    def get_version():
        return "version : " + str(__version__)
    def get_id(self, client):
        return "Your id is {}.".format(id(client))
    
    def info(self):
        return f"Socket Server[version {self.get_version()}] By zmh."
    def unknown(self,s):
        return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())
    def cut(self, string):
        return string.strip().split()
    def handler(self, c, client=None):
        return "<font color='gray'>[command] </font><font color='brown'>%s</font>\n%s"  % (c,str(self._function(self.cut(c),client)))
    def iscommand(self,i):
        return i.strip().startswith("/")
class Server():
    def __init__(self, addr, port, backlog = 10, encode = 'utf8'):
		...
        self.com = Command_Handler(self)
        self.encode = encode
        self.errs = []
        self.errduring = False

| 3.新增cmd控制台颜色改变

我的文章: Python 命令行cmd指定颜色设置

#color.py

import ctypes,time
STD_INPUT_HANDLE = -10  
STD_OUTPUT_HANDLE= -11  
STD_ERROR_HANDLE = -12
std_out_handle=ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
class Text():
    DARKBLUE = 0x01 # 暗蓝色
    DARKGREEN = 0x02 # 暗绿色
    DARKSKYBLUE = 0x03 # 暗天蓝色
    DARKRED = 0x04 # 暗红色
    DARKPINK = 0x05 # 暗粉红色
    DARKYELLOW = 0x06 # 暗黄色
    DARKWHITE = 0x07 # 暗白色
    DARKGRAY = 0x08 # 暗灰色/亮
    BLUE = 0x09 # 蓝色
    GREEN = 0x0a # 绿色
    SKYBLUE = 0x0b # 天蓝色
    RED = 0x0c # 红色
    PINK = 0x0d # 粉红色
    YELLOW = 0x0e # 黄色
    WHITE = 0x0f # 白色

class Background():
    BLUE     = 0x10 # 蓝
    GREEN    = 0x20 # 绿
    RED      = 0x40  # 红
    INTENSITY = 0x80 # 亮
 
def SetCmdColor(color, handle=std_out_handle):
    Bool=ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)
    return Bool
 
def Print(color:int,*args, **kwargs):
    SetCmdColor(color)
    print(*args, **kwargs)
    SetCmdColor(Text.DARKWHITE)
#server.py
from color import Text, Background, Print

| 4.客户端PyQt5信息过快使Textedit刷新空白

增加延迟呗

...
    def Show_Message(self, data):
        if data:
            for i in data.split('\n'):
                sleep(0.2 / len(data.split('\n'))) #防止信息过快使Textedit刷新空白
                self.textEdit_2.append(i)
                print(i)

| 5.PyQt5 消息超过一页下拉浪费时间: 指针至最后

    def Show_Message(self, data):
        if data:
			...
        self.textEdit_2.moveCursor(QtGui.QTextCursor.End)

| 6.服务端socket多次连接不同地址错误

OSError: [WinError 10022] 提供了一个无效的参数.

    @to_logging
    def socket_connect(self):
        if hasattr(self, "_socc"):
            self.retry() #socket多次连接不同地址会出现 `OSError: [WinError 10022] 提供了一个无效的参数.`
        else:
            self._socc = True
       self.socket.connect(self.addr)

| 7. 完整代码

客户端:

# -*- coding: utf-8 -*-

# Form implementation generated from reading ui files 'USER.ui', 'Connect.ui'
#
# Created by: PyQt5 UI code generator 5.15.4
#
# WARNING: Any manual changes made to this file will be lost when pyuic5 is
# run again.  Do not edit this file unless you know what you are doing.


from PyQt5 import QtCore, QtGui, QtWidgets
import socket, sys, logging
from traceback import format_exc
from datetime import datetime
from time import sleep
from threading import Thread
from random import randint as rand
import logging  # 引入logging模块
logging.basicConfig(level=logging.DEBUG,format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
Username = f'zmh {rand(0,1000)}'
TIMEOUT = 3
IP = "114.67.206.19"
def get_host_ip() -> str:
    """get current IP address"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip
selfIP = get_host_ip()
bytecount = 1024
dicts = {f"{IP} (公网)" : {"ip":IP, "port":16223},
         f"{selfIP} (私网)" : {"ip":selfIP,"port":429},
         "EXAM-41 (微机室)" : {"ip":"EXAM-41","port":429}}
get_time = lambda: datetime.now().strftime('%Y %m %d %H:%M:%S')
def threading(Daemon, **kwargs):
        thread = Thread(**kwargs)
        thread.setDaemon(Daemon)
        thread.start()
        return thread

def to_logging(command):
    def logs(*args, **kwargs):
        try:
            command(*args, **kwargs)
        except Exception as e:
            if "main" in dir():
                main.Show_Message(format_exc())
            else:
                logging.exception(str())
            return False
        else:
            return True
    return logs

class Socket:
    def __init__(self,Function=lambda i:None,code='utf-8'):
        self.socket = socket.socket()
        self.code = code
        self._logger = Function
        self.socket.settimeout(TIMEOUT)
        self._connect = False
    def set_func(self, f):
        self._logger = f

    def retry(self):
        del self.socket
        self.socket = socket.socket()
        self.socket.settimeout(TIMEOUT)
    @to_logging
    def socket_connect(self):
        if hasattr(self, "_socc"):
            self.retry() #socket多次连接不同地址会出现 `OSError: [WinError 10022] 提供了一个无效的参数.`
        else:
            self._socc = True
        self.socket.connect(self.addr)
    def connect(self, ip = None,port:int=0000, show=None):
        self.addr = (ip, port)   
        if not self.socket_connect():
                show("[{}]: 连接服务器[{}]失败".format(get_time(),self.addr[0]))
                return False
        else:
                show("[{}]: 连接成功".format(get_time()))
                return True
    def _handler(self):
            self.socket.send(Username.encode(self.code))
            while True:
                try:
                    byte = self.socket.recv(bytecount **2)
                    if len(byte) == 0:
                            break
                    kb = len(byte) / bytecount
                    self._logger(f'[{datetime.now().strftime("%H:%M:%S")}]{byte.decode(encoding=self.code)}                  {"<font size=1>%0.2f kb</font>" % kb}')
                except Exception as e:
                    if not type(e) == socket.timeout:
                        for n in ["","ERROR".center(20,"-")]+format_exc().split('\n')+["".center(20,"-"),""]:
                            self._logger(f"<font color='red'>{n}</font>")
                        self.socket.close()
                        return
    def run(self): #线程
        self._connect = True
        self.thread = threading(False, target=self._handler)

class Ui_Dialog(object):
    def __init__(self):
        Dialog = QtWidgets.QDialog()
        self.Dialog = Dialog
        Dialog.setObjectName("Dialog")
        Dialog.resize(495, 81)
        self.gridLayout = QtWidgets.QGridLayout(Dialog)
        self.gridLayout.setObjectName("gridLayout")
        self.comboBox = QtWidgets.QComboBox(Dialog)
        self.comboBox.setObjectName("comboBox")
        self.comboBox.addItems(list(dicts.keys()))
        self.gridLayout.addWidget(self.comboBox, 0, 1, 1, 1)
        self.pushButton = QtWidgets.QPushButton(Dialog)
        self.pushButton.setObjectName("pushButton")
        self.gridLayout.addWidget(self.pushButton, 0, 2, 1, 1)
        self.label = QtWidgets.QLabel(Dialog)
        font = QtGui.QFont()
        font.setFamily("Consolas")
        self.label.setFont(font)
        self.label.setObjectName("label")
        self.gridLayout.addWidget(self.label, 0, 0, 1, 1)
        self.lineEdit = QtWidgets.QLineEdit(Dialog)
        self.lineEdit.setReadOnly(True)
        self.lineEdit.setObjectName("lineEdit")
        self.gridLayout.addWidget(self.lineEdit, 1, 0, 2, 3)

        self.retranslateUi(Dialog)
        self.pushButton.clicked.connect(self._connect)
        QtCore.QMetaObject.connectSlotsByName(Dialog)
        self.num = 0
    def retranslateUi(self, Dialog):
        _translate = QtCore.QCoreApplication.translate
        Dialog.setWindowTitle(_translate("Dialog", "Dialog"))
        self.comboBox.setItemText(0, _translate("Dialog", list(dicts.keys())[0]))
        self.pushButton.setText(_translate("Dialog", "Connecting"))
        self.label.setText(_translate("Dialog", "Select Socket Connect Address:"))
        self.set_text("No return value.")
        Dialog.show()
    def set_text(self, m):
        self.lineEdit.setText(QtCore.QCoreApplication.translate("Dialog", str(m)))
        self.lineEdit.update()
    @to_logging
    def _connect(self, i):
        addr = dicts[self.comboBox.currentText()]
        self.set_text("[{}]: 尝试连接服务器[{}],最大超时报错 {}s".format(datetime.now().strftime('%Y %m %d %H:%M:%S'),addr["ip"],TIMEOUT))
        if s.connect(**addr, show=self.set_text):
            global main
            main = Ui_MainWindow()
            main.show()
            def close(widget):
                sleep(1)
                widget.close()
            threading(False, target=close, args=(self.Dialog, ))
class Ui_MainWindow(object):
    def __init__(self):
        MainWindow = QtWidgets.QMainWindow()
        MainWindow.setObjectName("MainWindow")
        MainWindow.resize(800, 619)
        font = QtGui.QFont()
        font.setFamily("Consolas")
        MainWindow.setFont(font)
        self.centralwidget = QtWidgets.QWidget(MainWindow)
        self.centralwidget.setObjectName("centralwidget")
        self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
        self.gridLayout.setObjectName("gridLayout")
        self.lineEdit_3 = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit_3.setReadOnly(True)
        self.lineEdit_3.setObjectName("lineEdit_3")
        self.gridLayout.addWidget(self.lineEdit_3, 7, 3, 1, 1)
        self.pushButton = QtWidgets.QPushButton(self.centralwidget)
        self.pushButton.setStyleSheet("background-color:rgb(44, 176, 13);\n"
"color:rgb(255, 255, 255);\n"
"font: 200 10pt \"Consolas\";")
        self.pushButton.setObjectName("pushButton")
        self.gridLayout.addWidget(self.pushButton, 8, 6, 1, 1)
        spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
        self.gridLayout.addItem(spacerItem, 8, 5, 1, 1)
        self.label_2 = QtWidgets.QLabel(self.centralwidget)
        self.label_2.setObjectName("label_2")
        self.gridLayout.addWidget(self.label_2, 6, 1, 1, 1)
        spacerItem1 = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
        self.gridLayout.addItem(spacerItem1, 8, 3, 1, 1)
        self.lineEdit_2 = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit_2.setReadOnly(True)
        self.lineEdit_2.setObjectName("lineEdit_2")
        self.gridLayout.addWidget(self.lineEdit_2, 6, 3, 1, 1)
        self.label = QtWidgets.QLabel(self.centralwidget)
        self.label.setObjectName("label")
        self.gridLayout.addWidget(self.label, 5, 1, 1, 1)
        self.lineEdit = QtWidgets.QLineEdit(self.centralwidget)
        self.lineEdit.setCursor(QtGui.QCursor(QtCore.Qt.IBeamCursor))
        self.lineEdit.setDragEnabled(False)
        self.lineEdit.setReadOnly(True)
        self.lineEdit.setObjectName("lineEdit")
        self.gridLayout.addWidget(self.lineEdit, 5, 3, 1, 1)
        self.label_3 = QtWidgets.QLabel(self.centralwidget)
        self.label_3.setObjectName("label_3")
        self.gridLayout.addWidget(self.label_3, 7, 1, 1, 1)
        self.line = QtWidgets.QFrame(self.centralwidget)
        self.line.setFrameShape(QtWidgets.QFrame.VLine)
        self.line.setFrameShadow(QtWidgets.QFrame.Sunken)
        self.line.setObjectName("line")
        self.gridLayout.addWidget(self.line, 5, 4, 3, 1)
        self.textEdit = QtWidgets.QTextEdit(self.centralwidget)
        self.textEdit.setObjectName("textEdit")
        self.gridLayout.addWidget(self.textEdit, 5, 5, 3, 2)
        spacerItem2 = QtWidgets.QSpacerItem(20, 40, QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Expanding)
        self.gridLayout.addItem(spacerItem2, 1, 1, 1, 1)
        self.textEdit_2 = QtWidgets.QTextEdit(self.centralwidget)
        self.textEdit_2.setObjectName("textEdit_2")
        self.textEdit_2.setReadOnly(True)
        self.gridLayout.addWidget(self.textEdit_2, 0, 3, 2, 4)
        self.pushButton_2 = QtWidgets.QPushButton(self.centralwidget)
        self.pushButton_2.setObjectName("pushButton_2")
        self.gridLayout.addWidget(self.pushButton_2, 8, 1, 1, 1)
        MainWindow.setCentralWidget(self.centralwidget)
        self.menubar = QtWidgets.QMenuBar(MainWindow)
        self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 24))
        self.menubar.setObjectName("menubar")
        self.menu = QtWidgets.QMenu(self.menubar)
        self.menu.setObjectName("menu")
        self.menulanguage = QtWidgets.QMenu(self.menu)
        self.menulanguage.setObjectName("menulanguage")
        MainWindow.setMenuBar(self.menubar)
        self.statusbar = QtWidgets.QStatusBar(MainWindow)
        self.statusbar.setObjectName("statusbar")
        MainWindow.setStatusBar(self.statusbar)
        self.actionsocket_connet = QtWidgets.QAction(MainWindow)
        self.actionsocket_connet.setObjectName("actionsocket_connet")
        self.actionChinese = QtWidgets.QAction(MainWindow)
        self.actionChinese.setObjectName("actionChinese")
        self.actionip_socket_gethostbyname_socket_gethostname = QtWidgets.QAction(MainWindow)
        self.actionip_socket_gethostbyname_socket_gethostname.setObjectName("actionip_socket_gethostbyname_socket_gethostname")
        self.menulanguage.addSeparator()
        self.menulanguage.addAction(self.actionChinese)
        self.menu.addSeparator()
        self.menu.addAction(self.menulanguage.menuAction())
        self.menu.addAction(self.actionip_socket_gethostbyname_socket_gethostname)
        self.menubar.addAction(self.menu.menuAction())
        self.socket_peername = s.addr[0]
        self.retranslateUi(MainWindow)
        QtCore.QMetaObject.connectSlotsByName(MainWindow)
        self.MainWindow = MainWindow
        self.pushButton.clicked.connect(self.send)
    @to_logging
    def sendmsg(self):
        data = self.textEdit.toPlainText().strip()
        if data:
            s.socket.send(data.encode('utf8'));self.textEdit.clear()
    @to_logging
    def send(self, i):
        if hasattr(s,"_connect") and s._connect:
            if not self.sendmsg():
                QtWidgets.QMessageBox.information(self.MainWindow, 'TraceBack',f'Socket Server<{self.socket_peername}> 断开连接')
                s._connect = False
        else:
            self.Show_Message("<font color='red'>发送异常. 未连接至服务器.请点击[重新连接服务器]按钮尝试重新连接.</font>")
    def retranslateUi(self, MainWindow):
        _translate = QtCore.QCoreApplication.translate
        MainWindow.setWindowTitle(_translate("MainWindow", "Socket"))
        self.lineEdit_2.setText(socket.gethostname())
        self.lineEdit.setText(socket.gethostbyname(socket.gethostname()))
        self.lineEdit_3.setText(self.socket_peername)
        self.pushButton.setText(_translate("MainWindow", "send"))
        self.label_2.setText(_translate("MainWindow", "主机名:"))
        self.label.setText(_translate("MainWindow", "本地端口:"))
        self.label_3.setText(_translate("MainWindow", "连接端口:"))
        self.pushButton_2.setText(_translate("MainWindow", "重新连接服务器"))
        self.menu.setTitle(_translate("MainWindow", "设置"))
        self.menulanguage.setTitle(_translate("MainWindow", "language"))
        self.actionsocket_connet.setText(_translate("MainWindow", "socket connect"))
        self.actionChinese.setText(_translate("MainWindow", "Chinese"))
        self.actionip_socket_gethostbyname_socket_gethostname.setText(_translate("MainWindow", "ip: "+socket.gethostbyname(socket.gethostname()) ))

        s.set_func(self.Show_Message)
        s.run()
    def show(self):
        self.MainWindow.show()
    def Show_Message(self, data):
        if data:
            for i in data.split('\n'):
                sleep(0.2 / len(data.split('\n'))) #防止信息过快使Textedit刷新空白
                self.textEdit_2.append(i)
                print(i)
        self.textEdit_2.moveCursor(QtGui.QTextCursor.End)
if __name__ == "__main__":
        app = QtWidgets.QApplication(sys.argv)
        s = Socket()
        conn = Ui_Dialog()
        sys.exit(app.exec_())

服务端:

server.py

import socket # 导入 socket 模块
from threading import Thread
import logging
from color import Text, Background, Print
__version__ = 1.1
def threading(Daemon, **kwargs):
        thread = Thread(**kwargs)
        thread.setDaemon(Daemon)
        thread.start()
        return thread




logger = logging.getLogger(__name__)
logger.setLevel(level = logging.INFO)
handler = logging.FileHandler("log.txt")
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter("%(asctime)s - %(levelname)s - %(message)s"))
logger.addHandler(handler)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logger.addHandler(handler)
logger.addHandler(console)
bytecount = 1024
class Command_Handler(object):
    def __init__(self, bind=None):
        """Bind Server class"""
        self.bind = bind
    def _function(self, _list, client):
        
        data = {"/info" : {"-v": self.get_version(),
                           "-id" : self.get_id(client),
                           "-i" : self.info(),
                           "-h" : self.help()},
                }
        _dict = data
        for n in range(len(_list)):
            if type(_dict) == dict:
                _dict = _dict.get(_list[n],  self.unknown(" ".join(_list)))
            else:
                break
        if  type(_dict) == dict:
             _dict = "Error:\n<font color='blue'>This command must take more arguments. Such as %s.</font>" % list(_dict.keys()) 
        return _dict
    @staticmethod
    def help():
        return """/info [-v] [-id] [-i]
-v : get version of program.
-id : get your id.
-i : get information.
-h : help.
For example, <font color=red>/info -id</font>"""
    @staticmethod
    def get_version():
        return "version : " + str(__version__)
    def get_id(self, client):
        return "Your id is {}.".format(id(client))
    
    def info(self):
        return f"Socket Server[version {self.get_version()}] By zmh."
    def unknown(self,s):
        return """Error:
No command named "%s". Please search [/info -h] to help.
%s""" % (s, self.help())
    def cut(self, string):
        return string.strip().split()
    def handler(self, c, client=None):
        return "<font color='gray'>[command] </font><font color='brown'>%s</font>\n%s"  % (c,str(self._function(self.cut(c),client)))
    def iscommand(self,i):
        return i.strip().startswith("/")
class Server(object):
    join_message = "<font color='red'>Server></font> <font color='blue'>%s</font> 连接服务器. 当前在线人数: <font color='red'>%s</font>"
    user_message = "<font color='%s'>%s(%s端)%s></font> %s"
    quit_message = "%s 下线了, %s"
    def __init__(self, addr, port, backlog = 10, encode = 'utf8'):
        self.address = addr, port
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.bind(self.address)
        self.socket.listen(backlog)
        self.connect = {}
        self.com = Command_Handler(self)
        self.encode = encode
        self.errs = []
        self.errduring = False
    def run(self):
        logger.info("服务器[%s]开启,端口[%s]" % (socket.gethostbyname(socket.gethostname()), self.address[1]))
        self.accept_client()
    def _get_sockets(self): #return int
        return len(self.connect.items())
    def _str_sockets(self):
        return f"当前人数 {self._get_sockets()}"
    def send(self, sock, user, mes):
        @self.QUIT(user)
        def func():
            sock.sendall(mes.encode(self.encode))
        return func()
    def QUIT(self, user):
        sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
        def prop(command, *args, **kwargs):
            def logs(*args, **kwargs):
                try:
                    command(*args, **kwargs)
                    return True
                except:
                    logging.exception(str())
                    if self.errduring is False:
                        self.errs.append((sock,user)) #在遍历时列表增加 -> 触发RuntimeError
                    return False
            return logs
        return prop
    def ServerMessage(self, mes, inc=True):
        self.errduring = True
        for sock, user in self.connect.items():
            if not sock in self.errs:
                self.send(sock, user, mes)
        if inc:
            self.error_handle()
        self.errduring = False
    def UserMessage(self, address, _user, mes,inc=True):
        if not mes:
            return
        self.errduring = True
        for sock, user in self.connect.items():
            if not sock in self.errs:
                send_message = Server.user_message % ("brown" if _user == user else "red",
                                                      _user,
                                                      address,
                                                      "(我自己)" if _user == user else "",
                                                      mes)
                self.send(sock, user, send_message)
        logger.info(f"{address}[{_user}] : {mes}")
        if inc:
            self.error_handle()
        self.errduring = False
    def error_handle(self):
        for _, user in self.errs: #使用变量_ : 为了防止触发KeyError: <socket.socket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
            if user in list(self.connect.values()):
                sock = list(self.connect.keys())[list(self.connect.values()).index(user)]
                logger.exception(msg = str())
                Q = Server.quit_message % (user, self._str_sockets())
                logger.info(Q)
                self.connect.pop(sock) 
                self.ServerMessage(Q, False) #防止多次调用error_handle函数
                sock.close()
    def accept_client(self):
      while True:
        client, address = self.socket.accept() # 阻塞,等待客户端连接
        thread = threading(Daemon=True, target=self.message_handle, args=(client,address[0]))

    def message_handle(self,client,address):
        username = client.recv(1024).decode(encoding='utf8')
        self.connect[client] = username
        logger.info(f"{address}[{username}] 加入服务器 , " + self._str_sockets())
        self.ServerMessage(Server.join_message % (address, self._get_sockets()) )
        @self.QUIT(username)
        def update():
            string = client.recv(bytecount **2).decode(encoding="utf8")
            Print(Text.BLUE,f"{username}>{string}")
            if self.com.iscommand(string):
                self.send(client, username, self.com.handler(string))
            else:
                self.UserMessage(address,username,string)
        while True:
            status = update()
            if not status:
                break
def get_host_ip() -> str:
    """get current IP address"""
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip

server = Server(get_host_ip(),429)
server.run()
sys.exit(app.exec_())

color.py

import ctypes,time
  
STD_INPUT_HANDLE = -10  
STD_OUTPUT_HANDLE= -11  
STD_ERROR_HANDLE = -12
std_out_handle=ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
class Text():
    DARKBLUE = 0x01 # 暗蓝色
    DARKGREEN = 0x02 # 暗绿色
    DARKSKYBLUE = 0x03 # 暗天蓝色
    DARKRED = 0x04 # 暗红色
    DARKPINK = 0x05 # 暗粉红色
    DARKYELLOW = 0x06 # 暗黄色
    DARKWHITE = 0x07 # 暗白色
    DARKGRAY = 0x08 # 暗灰色/亮
    BLUE = 0x09 # 蓝色
    GREEN = 0x0a # 绿色
    SKYBLUE = 0x0b # 天蓝色
    RED = 0x0c # 红色
    PINK = 0x0d # 粉红色
    YELLOW = 0x0e # 黄色
    WHITE = 0x0f # 白色

class Background():
    BLUE     = 0x10 # 蓝
    GREEN    = 0x20 # 绿
    RED      = 0x40  # 红
    INTENSITY = 0x80 # 亮
 
def SetCmdColor(color, handle=std_out_handle):
    Bool=ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)
    return Bool
 
def Print(color:int,*args, **kwargs):
    SetCmdColor(color)
    print(*args, **kwargs)
    SetCmdColor(Text.DARKWHITE)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-04-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • | 1. 服务器代码改进 / bug改进
    • (1).发送函数改为@function
      • (2).异常运行函数改为三叠函数
        • (3).服务端在下线时列表在遍历时 增加下线的服务端 -> 触发RuntimeError
          • (4)获取真正本机的ip地址
          • | 2.新增命令功能 (在输入框添加"/")
          • | 3.新增cmd控制台颜色改变
          • | 4.客户端PyQt5信息过快使Textedit刷新空白
          • | 5.PyQt5 消息超过一页下拉浪费时间: 指针至最后
          • | 6.服务端socket多次连接不同地址错误
          • | 7. 完整代码
          相关产品与服务
          命令行工具
          腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档