首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python自动化开发学习10

Python自动化开发学习10

作者头像
py3study
发布2020-01-08 10:51:38
9940
发布2020-01-08 10:51:38
举报
文章被收录于专栏:python3python3

多线程的使用场景

上次讲了由于GIL锁的存在,Python的多线程是假的,用的还是CPU的单核。Python的多线程只是利用了CPU的上下文切换,上下分切换也是占用CPU的。那么什么时候用多行程? Python的多线程,适合IO密集型的任务,不适合CPU密集型的任务。 IO操作不占用CPU,比如socket这种网络编程的情景。 计算占用CPU,所以大量计算的情景下多线程反而更慢,额外消耗了CPU切换上下文的计算。

多进程

多进程的基本语法和多线程差不多:

import multiprocessing
import time

def show(name):
    time.sleep(2)
    print('hello', name)

if __name__ == '__main__':
    p = multiprocessing.Process(target=show, args=('Jack',))
    p.start()
    p.join()  # join的效果就是等待子进程执行完毕
    print('执行结束')

上面的例子,只模块名变了,其他都和多线程差不多。

父进程和子进程的id号

下面的例子打印了进程的id号:

import multiprocessing
import os, time

def info(title):
    print(title, 'module name:', __name__)  # 模块名
    time.sleep(0.3)  # 加点停顿,可以看出来,所有进程真的是并行处理的
    print(title, 'parent process:', os.getppid())  # 父进程号
    time.sleep(0.3)
    print(title, 'process id:', os.getpid())  # 进程号

def f(title):
    info(title)

if __name__ == '__main__':
    info('main')
    for i in range(10):  # 这次起10个进程
        p = multiprocessing.Process(target=f, args=('p%s' % i,))
        p.start()

可以适当修改加长info的延时,,可以去系统里查看一下所有进程的情况,如下图:

Python自动化开发学习10
Python自动化开发学习10

上面起了10个子进程,加上主进程,一个11个python进程。 我是用pycharm执行的代码,主进程的ID是8036,主进程的父进程是pycharm7832。 然后,所有的子进程,都是通过8036这个python的父进程开启的。8036就是这些子进程的父进程。

进程间通讯

进程间的内存是独立的,如果进程间需要交换数据,就需要借助其他方法

通过队列 - Queue

下面的例子通过Queue实现了进程间的通信,

from multiprocessing import Process, Queue
import time

def f(q):
    q.put([42, None, 'hello'])
    time.sleep(30)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())  # prints "[42, None, 'hello']"
    time.sleep(30)

这里的Queue是 multiprocessing 多线程模块里的,不是之前的独立的queue模块。

通过管道 - Pipe

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()  # 实例化管道后会生成2个对象,这2个对象是一样的
    p = Process(target=f, args=(child_conn,))  # 把管道的1头交给子进程,自己操作另外一头
    p.start()
    print(parent_conn.recv())  # prints "[42, None, 'hello']"

上面实例化管道之后的两个对象是一样的,不要被名字误导,这么取名是随进程的关系。 实例化后的两个对象就是管道的两头。通讯的双方任意各取一头操作,就能实现管道两头的通讯。这里是父进程留下一头,把另一头传递给子进程操作。 这里管道的操作使用send和recv,类似socket(不过没有黏包)。这边send一次对端就recv一次获取数据,如果一边send多次,那么对端也只能一次recv取到一次的数据,所以也得recv多次才能取到全部的数据。如果数据取完了,再recv则会阻塞,等待管道对端send数据进来。

通过 Manager 共享

上面的2个方法,只是实现了数据的传递。 通过Manager生成的数据对象,可以在多个进程间共享。以下的数据类型都可以通过Manager来生成,实现共享: list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array 举例说明:

from multiprocessing import Process, Manager
import os

def f(d, l):
    pid = os.getpid()
    d['pid'] = pid  # 每次这个key都会被重写,最后打印的一定是最后一个操作的进程的结果
    d[pid] = pid  # 每次都是生成一个新的key,添加到字典里
    l.append(pid)  # 每次往列表中添加一个元素

if __name__ == '__main__':
    with Manager() as manager:  # 等于manager = Manager(),最后还省了一个manager.close()
        d = manager.dict()  # 生成一个共享的字典
        l = manager.list()  # 生成一个共享的列表
        p_list = []
        for i in range(10):
            p = Process(target=f, args=(d, l))
            p.start()
            p_list.append(p)
        for res in p_list:
            res.join()
        print(d)
        print(l)

进程锁 Lock

线程有线程锁,进程也有进程锁。用法也和线程锁一样,如下:

from multiprocessing import Process, Lock
import time

def f(l, n):
    l.acquire()
    print('hello world', n, end='', flush=True)
    time.sleep(0.1)
    print(" Finished", n)
    l.release()
    time.sleep(0.1)

if __name__ == '__main__':
    lock = Lock()
    for n in range(10):
        Process(target=f, args=(lock, n)).start()

有锁,但是我们要锁什么?上面的代码就算不加锁应该也不一定会有什么问题。线程锁我们锁的是内存,因为线程共享的是同一块内存空间。进程锁锁的是资源,进程就是各种资源的集合。比如例子中的print用到的就是屏幕输出的资源,如果不加锁,可能几个进程同时想操作屏幕输出内容,那么就有可能会造成最终输出的字符错乱。不过打印数据应该是一串一次性进缓存的,应该也不会出现被插队吧。 虽然试不出字符错乱的情况,但是对两次print之间不要插入别的进程的内容。比如例子中去掉进程锁就会出现两段内容混乱的情况。 和线程锁的情况一样,加了锁之后,其实就应该是暂时变成了串行执行了。

进程池

每起一个进程,就是克隆一份父进程的基本数据给子进程用。这样开销会很大(比如内存),系统资源是有限的,无限起进程,可能会导致系统瘫痪。所以要有进程池。 学习线程的时候也有类似的问题,不过线程占用资源小,不容易导致系统瘫痪,但是一定会导致CPU频繁切换上下文导致效率反而会降低。所以要有信号量。 设定进程池,可以限制一次起的进程的数量。这点有点像信号量 进程池有两个方法:

  • apply :还是串行
  • apply_async :这个是并行

apply

先看apply的例子:

from multiprocessing import Pool
import time

def Foo(i):
    time.sleep(1)
    print("In Foo", i)

if __name__ == '__main__':
    pool = Pool(5)
    for i in range(10):
        pool.apply(func=Foo, args=(i,))
    print('end')
    pool.close()  # 老师说进程池这里要先close再join,至于为啥不知道
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

执行的时候就是同时起5个进程(Pool中设置了进程池的大小),但是是一个一个依次执行的。执行完毕后会退出然后可以将之后的进程放入进程池等待执行。 注意最后要先close再join。可以试验一下,如果不执行close,但是直接执行join的话会报错。

apply_rsync

使用这个方法就是并行效果了,一次并行执行5个。每完成1个会再将下一个放入进程池马上执行:

from multiprocessing import Pool
import time

def Foo(i):
    time.sleep(1)
    print("In Foo", i)

if __name__ == '__main__':
    pool = Pool(5)
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,))
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

注意:还是close和join的问题,刚才异步的时候其实不写也没问题。但是这里由于是并行的,主进程执行完毕之后如果没有join就会直接关闭了。子进程也不会再执行了,就像守护线程那样。所以这里一定要加上join,等待pool里的进程都执行完毕。然后join前必须要close。

回调函数

callback参数 :这里还有一个callback参数,函数执行完成后可以调用另一个函数,这个叫回调函数。 回调函数 :就是前面的方法执行完之后,就会自动对用执行这个回调函数。并且是由主进程调用执行的。 举例说明:

from multiprocessing import Pool
import os
import time

def Foo(i):
    time.sleep(1)
    # raise
    return os.getpid(), os.getppid()

def Bar(arg):
    print(os.getpid(), 'Foo 执行完毕,结果:', arg)

if __name__ == '__main__':
    pool = Pool(5)
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,), callback=Bar)
    print('end')
    pool.close()
    pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

回调函数只有在函数正常执行完之后才会被调用。Foo中有一句raise,主动抛出一个错误,如果去掉注释导致函数没有正常执行完成,rais之前的print还是会正常执行,但是不会调用callback的函数执行。 另外,这里打印了每个进程id,从id中可以看到,Foo函数是由主进程启动的子进程执行的。而callback的函数是由主进程来执行的。Foo的父进程id就是Bar的进程id 回调函数的意义,主要就是因为回调函数是由主进程执行的。如果子进程的执行结果需要记录保留,那么这部分工作就通过调用回调函数,由回调函数在主进程中来处理。比如将结果写入数据库,我们就要让每个子进程都连接数据库写入数据,而是在主进程里建立一个与数据库的连接,统一将执行结果写入数据库。虽然调用的是同一个函数,但是通过回调函数调用在主进程中执行效率会更高。比如例子中的做法,Foo负责返回数据,回调函数统一打印Foo的执行结果。

协程

协程,又称微线程,纤程。英文名Coroutine。一句话说明:协程是一种用户态的轻量级线程。 协程的好处:

  • 无需线程上下文切换的开销:就是没有CPU层面的上下文切换参考多线程的实现,我们在代码中实现切换
  • 无需原子操作锁定及同步的开销:就是不用加线程锁
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何

协程的缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将CPU 的多个核用上
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

之前学习生成器的时候,通过yield实现了单线程下多并发。但是那也不是真正的协程。 协程的定义:

  1. 必须在只有一个单线程里实现并发
  2. 修改共享数据不需加锁
  3. 用户程序里自己保存多个控制流的上下文栈
  4. 一个协程遇到IO操作自动切换到其它协程

总结:使用协程就是为了高效。协程如何实现高效?一遇到IO操作就切换,因为IO操作耗时但是不占用CPU,此时切换到另一个协程,高效的利用CPU。 问题:何时切换回来?IO操作结束了就可以切换回来。如何知道IO操作结束了?往下学 ...

yield

这里写个例子,回顾一下yield的用法。但是yield并不满足我们前面对协程的定义。 下面的例子会先启动B,B会启动A。B中打印后切换到A执行,A返回后循环。A中打印后通过yield返回,循环。A和B之间通过yield和send来传递count的值,每次都自增1。

import time

def print_A():
    count = 0
    while True:
        print('A'.center(9, '-'), count)
        time.sleep(0.1)
        count = yield count+1  # 从B那里send过来的值,赋值给count。然后回到开头执行打印,把count值自增1后再返回给B

def print_B(func):
    count = next(func)  # 要先next一下,启动A,这样A会先运行一次到yield的地方返回
    while True:
        print('B'.center(9, '-'), count)
        time.sleep(0.1)
        count = func.send(count+1)  # 将值传递给A的yield,并获取A的返回值

if __name__ == '__main__':
    a = print_A()
    print_B(a)

greenlet 模块

greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator。 这个是第三方模块,所以需要安装。安装的话直接安装下面要讲的gevent模块就好了。 greenlet 模块实现的是协程的手动切换,其实就和yield差不多。不过用起来更好理解了。 gevent 模块才能实现我们要的自动切换,但是gevent是在greenlet的基础上进行了封装,实现了自动切换。所以安装的时候顺便把有依赖关系的greenlet模块一起装好了。现在自然也是先看一下greenlet模块的用法。

from greenlet import greenlet

def test1():
    print('A1')
    gr2.switch()
    print('A2')
    gr2.switch()

def test2():
    print("B1")
    gr1.switch()
    print('B2')

if __name__ == '__main__':
    gr1 = greenlet(test1)  # 启动一个协程
    gr2 = greenlet(test2)  # 再启动一个协程
    gr1.switch()  # 手动切换一下

执行一下switch,就完成了切换的操作。比yield更直观。不过重点是学下一个模块。

gevent 模块

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Python的轻量级协程。Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

import gevent

def func1():
    print("This is in func1")
    gevent.sleep(2)
    print("End of func1")

def func2():
    print("This is in func2")
    gevent.sleep(1)
    print("End of func2")

if __name__ == '__main__':
    gevent.joinall([gevent.spawn(func1),
                    gevent.spawn(func2)])
    # 上面是把要启动的方法加到列表里一起处理了,推荐就这么做
    # 其实也可以使用start和join一个一个启动,
    f1 = gevent.spawn(func1)
    f2 = gevent.spawn(func2)
    f1.start()  # 没有start,貌似也一样,注释掉试一下
    f2.start()
    f1.join()  # 没有join主线程会直接退出,就不会等待上面的协程的执行结果了
    f2.join()

要启动所有的协程,通过gevent.joinall。参数是一个列表,列表中依次启动需要进行自动切换操作的协程。并且阻塞等待所有协程处理完毕。相当于start和join 上面的gevent.sleep是一个模拟IO操作,不会像time.sleep那样停在那里,而是会当做有一个几秒的IO操作。上面的输出结果是:

 This is in func1
This is in func2
End of func2
End of func1

首先执行了func1,打印了第一行。然后之后是一个IO操作,所以切换到了下一个协程。 切换到func2,打印了一行,之后又是一个IO操作,此时再切换。不过此时已经没有可操作的协程了。没别的协程了,fun1也没好。 之后是fun2的IO操作先执行完毕,所以最终切换到fun2的时候,打印了func2的第二行,打印前会顿1秒。 最后func1的IO操作也结束了,于是切换到fun1,打印fun1的第二行,打印前会再顿1秒。

并发爬网页

这根本就不是爬虫,这里先讲如何将一个网页保存到本地,因为这就是一个比较耗时的IO操作。刚才的例子中我们是用sleep来模拟的。直接上代码就好了:

from urllib import request

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)  # 给网址发一个请求
    data = resp.read()  # 读取到的就是整个网页的内容
    with open('url.html', 'wb') as file:  # 将网页保存下来
        file.write(data)
    print('%d betes received from %s' % (len(data), url))

if __name__ == '__main__':
    f('http://www.python.org/')

然后我们来多爬几个网页,看下协程的效果。这次计算一下整个过程的时间:

from urllib import request
import gevent
import time

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)  # 给网址发一个请求
    data = resp.read()  # 读取到的就是整个网页的内容
    # with open('url.html', 'wb') as file:  # 保存网页的操作就先不用了
    #     file.write(data)
    print('%d betes received from %s' % (len(data), url))

if __name__ == '__main__':
    g_list = []
    url_list = ['http://www.python.org/', 'http://www.yahoo.com/', 'http://github.com/']
    for url in url_list:
        g_list.append(gevent.spawn(f, url))
    start_time = time.time()
    gevent.joinall(g_list)
    print('运行时间:', time.time()-start_time)

上面这段虽然用了gevent,但是还是串行的。到底是串行还是并行,只要看f函数开始的时候的第一句print是什么时候出现的就知道了。这里之所以是串行,是因为,并没有看到IO切换的命令,就是f函数里没有类似gevent.sleep这样的切换命令。但是,其实gevent是可以自动判断是否有IO操作的。所以这里的问题是gevent发现不了urllib模块里的IO操作。 所以真正要做的是在开头加上一句,让gevent能够发现这些IO操作。

from urllib import request
import gevent
import time

# 导入模块,添加下面这2句
from gevent import monkey
monkey.patch_all()  # 把当前程序的所有的IO操作给我单独的做上标记

def f(url):
    print('GET: %s' % url)
    resp = request.urlopen(url)  # 给网址发一个请求
    data = resp.read()  # 读取到的就是整个网页的内容
    # with open('url.html', 'wb') as file:  # 保存网页的操作就先不用了
    #     file.write(data)
    print('%d betes received from %s' % (len(data), url))

if __name__ == '__main__':
    g_list = []
    url_list = ['http://www.python.org/', 'http://www.yahoo.com/', 'http://github.com/']
    for url in url_list:
        g_list.append(gevent.spawn(f, url))
    start_time = time.time()
    gevent.joinall(g_list)
    print('运行时间:', time.time()-start_time)

高并发的socket_server

gevent里有专门的socket模块,当然其中大部分都是import原生的socket模块。我们可以使用gevent的socket实现一个单线程下高并发的socket_server。 服务端:

# 就算这里不导入socket,也会在gevent模块里导入。但是pycharm里下面的socket.socket会显示个错误,不影响运行
# 而且到下面也是要导入的,这里显示的声明一下,提前导入也不会影响效率
# import socket  # 建议导入
# 下面两句也可以不要,这里不加也能识别到socket的IO操作
# 只所以能识别,是因为下面的socket已经不是原生的socket了,而是gevent修改后的socket
# from gevent import monkey
# monkey.patch_all()

import gevent
from gevent import socket

def server(port):
    # 这里pycharm里会显示个错误,不影响运行。因为gevent的socket模块里没有socket这个方法
    # 但是其实gevent会把原生的socket全部导入的。就是运行的时候会有socket.socket这个方法
    server = socket.socket()
    server.bind(('localhost', port))
    server.listen(500)
    print("监听已经开始")
    while True:
        conn, addr = server.accept()
        print("发现连接请求:\n%s\n%s" % (conn, addr))
        gevent.spawn(handle_request, conn)

# 上面的函数建立了连接后,就将连接作为handle_request的参数,启动一个gevent的协程
# 下面的方法是通过协程启动的,是协程并发运行的
def handle_request(conn):
    while True:
        data = conn.recv(1024)
        if not data: break
        print("recv:", data.decode('utf-8'))
        conn.send(data.upper())
    conn.close()
    print("断开连接:", conn)

if __name__ == '__main__':
    server(8002)

客户端只需要之前的客户端就可以了。这里测试一下效率,起100个线程,每个线程发送100条消息:

import socket
import threading

HOST = 'localhost'  # The remote host
PORT = 8002  # The same port as used by the server
def client(i):
    client = socket.socket()
    client.connect((HOST, PORT))
    for j in range(100):
        msg = "hello %s %s" % (i, j)
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print('Received:', data.decode('utf-8'))
    client.close()

if __name__ == '__main__':
    for i in range(100):
        t = threading.Thread(target=client, args=(i,))
        t.start()

这里有一个问题,记一下。 gevent.spawn()

事件驱动与异步IO

通常,我们写服务器处理模型的程序时,有以下几种模型:

  • 每收到一个请求,创建一个新的进程,来处理该请求
  • 每收到一个请求,创建一个新的线程,来处理该请求
  • 每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求。就是协程

三种方法各有千秋,之前应该都说过,这里就当总结一下:

  • 进程,由于创建新的进程的开销比较大,所以,会导致服务器性能比较差,但实现比较简单。
  • 线程,由于要涉及到线程的同步,有可能会面临死锁等问题。
  • 协程,在写应用程序代码时,逻辑比前面两种都复杂。

事件驱动模型,目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件(比如web页面),这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:

  1. 有一个事件(消息)队列
  2. 鼠标按下时,往这个队列中增加一个点击事件(消息)
  3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等
  4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数

事件驱动编程范式

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。 在面对如下的环境时,事件驱动模型通常是一个好的选择:

  1. 程序中有许多任务
  2. 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)
  3. 在等待事件到来时,某些任务会阻塞

此处重申一下协程开篇提出的问题,只要一遇到IO就注册一个事件,然后主程序就可以继续干其它的事情了,直到IO处理完毕后,继续恢复之前中断的任务,这本质上是怎么实现的呢?

五种IO模式

用户空间与内核空间:现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操作系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。 缓存 I/O :又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

IO模式

对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:

  1. 等待数据准备 (Waiting for the data to be ready)
  2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正是因为上面的两个阶段,linux系统产生了下面五种网络模式的方案:

  • 阻塞 I/O(blocking IO) :就是最最简单的串行,效率低实现简单
  • 非阻塞 I/O(nonblocking IO) :第一阶段不会阻塞,需要反复主动发起询问数据是否准备好,第二阶段会阻塞
  • I/O 多路复用( IO multiplexing) :还是完全的阻塞,没什么优越性,但是可以同时处理多个连接。后面详讲
  • 信号驱动 I/O( signal driven IO) :略过没讲,第一阶段不阻塞,等待告知数据已经准备好,第二阶段会阻塞
  • 异步 I/O(asynchronous IO) :完全没有阻塞,看似是最好的方式。但是用的不多。asyncio模块在python3.4引入标准库,支持异步IO

五种模型的比较

Python自动化开发学习10
Python自动化开发学习10

用的最多的是IO多路复用。虽然看似异步IO更好,反正用的不多。另外多线程+阻塞模式也是一个方案,但是多线程的开销较大(相对于单线程),更适合处理少量的并发(多少算少?看你系统能起多少个线程,不过和进程比线程的开销还不算特别大)。要处理高并发,推荐还是使用IO多路复用。

IO多路复用

文件描述符(File descriptor) :是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。 I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),就通知程序进行相应的读写操作。 IO多路复用的三种机制: select :最早出现,有些缺点,优势就是几乎在所有平台上都支持。 pool :解决了部分缺点,但是本质上没多大差别,可以认为是个过度阶段。 epool :现在用这个,性能最好的。但是不是所有系统都支持,windows就不支持。

写一个非阻塞的socket

Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files,and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable,或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。 写一个socket的例子来理解 注意:socket必须得运行在非阻塞模式下。之前用的都是默认的阻塞模式,阻塞模式下,如果没有数据会等待。非阻塞模式下,如果没有数据就会抛出异常。所以我们需要用select来帮我们监视

开启监听

先写到accept之前,accept之前只是设置,执行到accept在阻塞模式下会进入阻塞。我们暂时只要能收到客户端请求建立起连接就好。现在是阻塞模式,所以直接accept没有数据就会报错。所以就需要select来解决了,监视到有活动的连接再返回并继续执行accept。

import select
import socket

server = socket.socket()
server.bind(('localhost', 9000))
server.listen()
print("监听已经开启")
server.setblocking(False)  # 设置为非阻塞
inputs = [server, ]
outputs = []
# 3个参数,读列表,写列表,异常列表,就是你想让内核监视哪些链接
# 异常列表监视的还是链接返回的内容,那么还是在inputs里,所以第三个参数还是填inputs
# 开始什么都没有,只有server,先把server加到inputs里
# 就是select监视到server活动了,就可以返回了
# 返回3个数据,监视到有活动的3个列表(读列表,写列表,异常列表)
readable, writeable, exceptional = select.select(inputs, outputs, inputs)
# 这里就是阻塞的,直到select监视到列表中有活动的链接,才会继续
# 非阻塞socket就是通过select来实现阻塞
print(readable, writeable, exceptional)  # 下面有打印的内容
# [<socket.socket fd=472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] []
# 第一个fd,就是文件描述符,非负整数。
conn, addr = server.accept()
print(conn, addr)

测试的客户端,只发不收:

import socket
HOST, PORT = "localhost", 9000
client = socket.socket()
client.connect((HOST, PORT))
while True:
    msg = input(">>:").strip()
    if len(msg) == 0: break
    client.send(msg.encode('utf-8'))
    # 下面2行是接收服务的返回是要用的,暂时先注释掉,到后面再启用测试返回数据
    # data = client.recv(1024)
    # print('Received:', data.decode('utf-8'))
client.close()

接收一条数据

上面链接已经可以建立起来了,那么来接收一条数据吧。这里recv之前当然还是要用select来监视是否有数据进来,有才会执行到recv。

import select
import socket

server = socket.socket()
server.bind(('localhost', 9000))
server.listen()
print("监听已经开启")
server.setblocking(False)  # 设置为非阻塞

inputs = [server, ]
outputs = []
# 3个参数,读列表,写列表,异常列表,就是你想让内核监视哪些链接
# 异常列表监视的还是链接返回的内容,那么还是在inputs里,所以第三个参数还是填inputs
# 如果inputs里是读的消息,会返回到readable列表里,如果是异常消息,就返回到exceptional列表里
# 开始什么都没有,只有server,先把server加到inputs里
# 就是select监视到server活动了,就可以返回了
# 返回3个数据,监视到活动的3个列表(读列表,写列表,异常列表)
readable, writeable, exceptional = select.select(inputs, outputs, inputs)
# 这里就是阻塞的,直到select监视到列表中有活动的链接,才会继续
# 非阻塞socket就是通过select来实现阻塞
print(readable, writeable, exceptional)  # 打印一下监视到的活动链接
for r in readable:  # 可能一次返回多个连接啊,所以得写个循环
    conn, addr = server.accept()  # 已经是非阻塞了,不经过select监视就会报错。可以注释掉select试一下
    print(conn, addr)
    inputs.append(conn)  # 把链接加入监视列表
    # 现在conn加到了inputs的监视列表里了,就可以通过select监视conn是否有数据进来了
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)
    print(readable, writeable, exceptional)  # 看看打印内容,inputs里的活动链接是conn,其实也可能是server
    data = conn.recv(1024)
    print(data.decode('utf-8'))

这里可以再试一下,应该可以接收到数据。但是这里有个问题。因为现在inputs列表里监视的内容是2个了,一个是server,一个是conn。如果再监视到server活动,说明又有新链接进来,如果监视到的是conn的活动,那么才是收到数据了。所以我们要在for循环里判断收到的活动链接是谁的。

可以接收多个连接请求

再多做一步,加上一层while循环,让服务端始终处于这么一个循环之中:select返回活动链接 ==> for循环处理所有的活动链接 循环继续。

import select
import socket

server = socket.socket()
server.bind(('localhost', 9000))
server.listen()
print("监听已经开启")
server.setblocking(False)  # 设置为非阻塞

inputs = [server, ]
outputs = []
# 3个参数,读列表,写列表,异常列表,就是你想让内核监视哪些链接
# 异常列表监视的还是链接返回的内容,那么还是在inputs里,所以第三个参数还是填inputs
# 开始什么都没有,只有server,先把server加到inputs里
# 就是select监视到server活动了,就可以返回了
# 返回3个数据,监视到活动的3个列表(读列表,写列表,异常列表)
while True:  # select返回活动链接==>for循环处理所有的活动链接,循环往复
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)
    # 这里就是阻塞的,直到select监视到列表中有活动的链接,才会继续
    # 非阻塞socket就是通过select来实现阻塞
    print(readable, writeable, exceptional)  # 下面有打印的内容
    # [<socket.socket fd=472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] []
    # 第一个fd,就是文件描述符,非负整数。
    for r in readable:  # 可能一次返回多个连接啊,所以得写个循环
        # 现在inputs里有server 和 conn了,
        # 如果readable返回的是server的活动,表示来了一个新链接
        # 如果readable返回的是conn的活动,表示收到了conn发来的数据
        if r is server:
            conn, addr = server.accept()  # 已经是非阻塞了,不经过select监视就会报错。可以注释掉select试一下
            print(conn, addr)
            inputs.append(conn)  # 把链接加入监视列表
        else:
            data = conn.recv(1024)
            print(data.decode('utf-8'))

现在新链接也能再连上来了,但是问题是旧的连接没有另外保存,最新连上的链接会再次赋值给conn。就链接发来的数据,导致select返回,但是会用conn去尝试recv。现在conn是新的连接,所以是空的,于是就报错。那么解决这个事情就是要保存每一个conn,就是说要再用一个列表保存所有的conn,再写一个for循环?我一开始是这么想的。 其实所有的连接都保存在inputs里了,for循环的时候就是取出每一个链接,在for循环里面应该使用变量r,而不是conn。难怪之前用conn的时候,pycharm会提示 ‘name can not be defined’ 。第一次循环的时候没有conn这个变量,不过也不会进到那个if里

将收到的数据发回给客户端

接下来继续,我们可以直接把数据发回去,这里没有阻塞的问题,就不搞了。换个方法,不直接发回去,先把消息保存到队列里。然后统一发送,这样就是发消息也是多路复用的形式。

import select
import socket
import queue

server = socket.socket()
server.bind(('localhost', 9000))
server.listen()
print("监听已经开启")
server.setblocking(False)  # 设置为非阻塞

inputs = [server, ]
outputs = []
data_queue = {}  # 存放返回给客户端消息的队列,每个客户端连接一个队列,就是一个item
# 3个参数,读列表,写列表,异常列表,就是你想让内核监视哪些链接
# 异常列表监视的还是链接返回的内容,那么还是在inputs里,所以第三个参数还是填inputs
# 开始什么都没有,只有server,先把server加到inputs里
# 就是select监视到server活动了,就可以返回了
# 返回3个数据,监视到活动的3个列表(读列表,写列表,异常列表)
while True:  # select返回活动链接==>for循环处理所有的活动链接,循环往复
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)
    # 这里就是阻塞的,直到select监视到列表中有活动的链接,才会继续
    # 非阻塞socket就是通过select来实现阻塞
    print('select返回:\n', readable, writeable, exceptional)  # 下面有打印的内容
    # [<socket.socket fd=472, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000)>] [] []
    # 第一个fd,就是文件描述符,非负整数。
    for r in readable:  # 可能一次返回多个连接啊,所以得写个循环
        # 现在inputs里有server 和 conn了,
        # 如果readable返回的是server的活动,表示来了一个新链接
        # 如果readable返回的是conn的活动,表示收到了conn发来的数据
        if r is server:
            conn, addr = r.accept()  # r就是server
            print('接收到新的客户端连接:\n', conn, addr)
            inputs.append(conn)
            data_queue[conn] = queue.Queue()  # 创建连接的消息队列
        else:
            data = r.recv(1024)  # r就是conn
            print(data.decode('utf-8'))
            # 接下来处理发数据
            # 把有数据返回的连接当道outputs列表里,这样下次循环select就会监视到
            # 要发的数据也得保存,这里用队列来保存准备发送的数据
            # 要为每个连接分别建一个队列,不能把消息搞混,这里用字典
            # 字典的key就是连接,value就是该连接的消息队列
            outputs.append(r)
            # data_queue = {} 在使用前,要先建立一个空字典。这句放到While循环外
            # data_queue[conn] = queue.Queue  # 在客户端建立连接时,就创建好连接的消息队列。这句放在上面处理server.accept()里面
            data_queue[r].put(data.upper())  # 然后就先不管了。等到select再监视的时候,会返回到writeable列表里。
            # 所以后面还要写一个writeable的for循环
    # 虽然是在上面的for循环里添加的,但是要等到在执行一次select后才会在writeable里有返回值
    for w in writeable:
        data = data_queue[w].get()  # 从队列里取出数据。这里get了之后,这条消息就从队列里移除了
        w.send(data)  # 发数据,注意data的数据类型
        outputs.remove(w)  # 从outputs里移除这个活动的连接,否则下次过来还有尝试在发数据,但是消息队列里是空的

这里把之前客户端注释掉的内容去掉测试一下收数据。并且已经可以接入多个客户端了,开2个试一下就好了。

处理客户端断开的问题

剩下就是客户端断开的问题了。断开有两种情况: 一种是正常断开,客户端close(),会发送一个空给服务的,那么要在 data = r.recv(1024) 之后判断一下是不是空,就和之前写的socket服务器一样。 还有一种是强行关闭客户端,这时inputs仍然会收到活跃连接,但是recv的时候会抛出异常“ConnectionResetError”,这里大概要把recv放到try里,如果捕获到异常,就断开客户端。 另外还有一个exceptional异常列表有返回的情况,这里也粗暴的断开客户端处理了好了。 客户端断开就是要清除掉字典和列表中的这个连接的信息。

import select
import socket
import queue

server = socket.socket()
server.bind(('localhost', 9000))
server.listen()
print("监听已经开启")
server.setblocking(False)  # 设置为非阻塞

inputs = [server, ]
outputs = []
data_queue = {}  # 存放返回给客户端消息的队列,每个客户端连接一个队列,就是一个item
while True:  # select返回活动链接==>for循环处理所有的活动链接,循环往复
    # 返回3个数据,监视到活动的3个列表(读列表,写列表,异常列表)
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)
    # 这里就是阻塞的,直到select监视到列表中有活动的链接,才会继续
    print('select返回:\n', readable, writeable, exceptional)
    for r in readable:  # 可能一次返回多个连接啊,所以得写个循环
        # 现在inputs里有server 和 conn了,
        # 如果readable返回的是server的活动,表示来了一个新链接
        # 如果readable返回的是conn的活动,表示收到了conn发来的数据
        if r is server:
            conn, addr = r.accept()  # r就是server
            print('接收到新的客户端连接:\n', conn, addr)
            inputs.append(conn)  # 将新连接加入到select监视列表中
            data_queue[conn] = queue.Queue()  # 创建新连接的消息队列
        else:
            # 这里有3中情况,有数据,有空数据(正常断开),无数据(一旦recv就报错)
            try:
                data = r.recv(1024)
            except Exception as error:
                print("recv时捕获到异常:%s" % error)
                # 清除连接的4个操作,这段代码重复用了3次,应该专门写个函数引用
                # 1 从读列表中清除,这里其实不用判断,但是后面的for循环里可能会尝试重复remove
                # 2 如果还有没发出去的消息,把连接从写列表中清除
                # 3 关闭连接
                # 4 如果还有没发出去的消息,把消息队列的对象从字典里清除
                if r in inputs:
                    inputs.remove(r)
                if r in outputs:
                    outputs.remove(r)
                r.close()
                if r in data_queue:
                    del data_queue[r]
            else:
                # 这里处理能recv到数据,收到数据就加入outputs列表。为空就清除连接
                if data:
                    print(data.decode('utf-8'))
                    outputs.append(r)
                    data_queue[r].put(data.upper())
                else:
                    print("客户端已断开:\n", r)
                    inputs.remove(r)
                    if r in outputs:
                        outputs.remove(r)
                    if r in data_queue:
                        del data_queue[r]
    # 虽然是上面的for循环里添加的,但是要等到在执行一次select后才会在writeable里有返回值
    for w in writeable:
        data = data_queue[w].get()  # 从队列里取出数据。这里get了之后,这条消息就从队列里移除了
        w.send(data)  # 发数据,注意data的数据类型
        outputs.remove(w)  # 从outputs里移除这个活动的连接,否则下次过来还有尝试在发数据,但是消息队列里是空的
    # 还有一个exceptional没处理,还是和上面一样,再写一个for循环
    # 异常处理这里还是简单粗暴把异常列表中的连接清除就好了
    for e in exceptional:
        print("异常列表有返回:", e)
        if e in inputs:
            inputs.remove(e)
        if e in outputs:
            outputs.remove(e)
        e.close()
        if e in data_queue:
            del data_queue[e]

现在就可以处理高并发了,写一个多线程的客户端,每个线程循环发数据测试一下。 测试客户端:

import socket
import threading

HOST = 'localhost'
PORT = 9000
def client(i):
    client = socket.socket()
    client.connect((HOST, PORT))
    for j in range(100):
        msg = "hello %s %s" % (i, j)
        client.send(msg.encode('utf-8'))
        data = client.recv(1024)
        print('Received:', data.decode('utf-8'))
    client.close()

if __name__ == '__main__':
    for i in range(100):
        t = threading.Thread(target=client, args=(i,))
        t.start()

到这里感觉好了,但是还有3地方有问题,下面一个一个说明。上面的代码经不起强行断开客户端的考验。

send 和 recv 异常的问题

非阻塞模式下,如果调用recv()没有发现任何数据,或send()调用无法立即发送数据,那么将引起异常。我这里看到的是 "ConnectionResetError" 。 产生的原因是强行断开客户端,导致这个连接已经失效,但是连接还在select返回的列表里。这时之后的for循环里还会尝试去send或recv这个连接就会抛出异常。 解决办法 :send 和 recv 的时候都得用try,然后捕获到异常后,就把这个连接清理掉

改进writeable循环

data = data_queue[w].get() 这里有时候会报错 "KeyError" ,就是字典里已经没有这个key了。那就是在另外2个for循环里已经将这个key清除了。这里清除的时候没清writeable列表,所以在清除连接的时候加一句:

if r in writeable:
    writeable.remove(r)

或者是用字典的get方法读取,这样在读取不存在的key的时候,会返回None:

data = data_queue.get(w)
if data:
    data = data.get()
else:
    if w in outputs:
        outputs.remove(w)
    continue

remove队列的问题

上面有3处地方会清除连接,这里可以另外写一个函数,需要清除的时候调用函数就好了。另外可能会再两个for循环里同时会要清除同一个连接,这样在第二次清除的时候如果不做if判断,就会报错无法删除列表里不存在的元素。这里保险起见还是在remove之前都加上if判断。应该只有第一个for循环里的inputs是一定有的不用判断,别的地方都可能会报错。

writeable的另外一个写法

说好了3个问题,这里还有一个暂时没事出问题,就是可能发数据的时候一次发不完。现在的写法都是默认一次send就是发完的,send之后直接从outputs里remove掉。 也可以换一种方法,send之后,不从移出outputs列表。那么下次while循环还会进来,此时get队列的时候要用get_nowait无阻塞模式取队列,如果空会抛出队列空的异常,那么这个一定是发完了。此时再把链接移出outputs列表。如果需要可以把上面字典 "KeyError" 一起处理了,继续用 data = data_queue[w].get() 取列表,放到try里,捕获2种异常分别处理掉。 后面的最终版本没这么用,我怕再踩到坑,先把想法记着。

小结 - 最终版本

下面给出一个所有遇到的问题都解决了的最终版本:

import select
import socket
import queue

server = socket.socket()
server.bind(('localhost', 9000))
server.listen()
print("监听已经开启")
server.setblocking(False)  # 设置为非阻塞

inputs = [server, ]
outputs = []
data_queue = {}  # 存放返回给客户端消息的队列,每个客户端连接一个队列,就是一个item
while True:  # select返回活动链接==>for循环处理所有的活动链接,循环往复
    # 返回3个数据,监视到活动的3个列表(读列表,写列表,异常列表)
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)
    # 这里就是阻塞的,直到select监视到列表中有活动的链接,才会继续
    print('select返回:\n', readable, writeable, exceptional)
    for r in readable:  # 可能一次返回多个连接啊,所以得写个循环
        # 现在inputs里有server 和 conn了,
        # 如果readable返回的是server的活动,表示来了一个新链接
        # 如果readable返回的是conn的活动,表示收到了conn发来的数据
        if r is server:
            conn, addr = r.accept()  # r就是server
            print('接收到新的客户端连接:\n', conn, addr)
            conn.setblocking(False)
            inputs.append(conn)  # 将新连接加入到select监视列表中
            data_queue[conn] = queue.Queue()  # 创建新连接的消息队列
        else:
            # 这里有3中情况,有数据,有空数据(正常断开),无数据(一旦recv就报错)
            try:
                data = r.recv(1024)
            except Exception as error:
                print("recv时捕获到异常:%s" % error)
                # 清除连接的4个操作,这段代码重复用了3次,应该专门写个函数引用
                # 1 从读列表中清除,这里其实不用判断,但是后面的for循环里可能会尝试重复remove
                # 2 如果还有没发出去的消息,把连接从写列表中清除
                # 3 关闭连接
                # 4 如果还有没发出去的消息,把消息队列的对象从字典里清除
                if r in inputs:
                    inputs.remove(r)
                if r in outputs:
                    outputs.remove(r)
                r.close()
                if r in data_queue:
                    del data_queue[r]
            else:
                # 这里处理能recv到数据,收到数据就加入outputs列表。为空就清除连接
                if data:
                    print(data.decode('utf-8'))
                    if r not in outputs: outputs.append(r)
                    data_queue[r].put(data.upper())
                else:
                    print("客户端已断开:\n", r
                    if r in inputs:
                        inputs.remove(r)
                    if r in outputs:
                        outputs.remove(r)
                    r.close()
                    if r in data_queue:
                        del data_queue[r]
    # 虽然是上面的for循环里添加的,但是要等到在执行一次select后才会在writeable里有返回值
    for w in writeable:
        data = data_queue.get(w)  # 先从字典里取出这个队列
        if data:  # 队列存在,取数据
            data = data.get()  # 从队列里取出数据。这里get了之后,这条消息就从队列里移除了
        else:  # 队列不存在,这个连接已经被清除了,remove掉,下一个循环
            if w in outputs:
                outputs.remove(w)
            continue
        try:
            w.send(data)  # 发数据,注意data的数据类型
        except Exception as error:
            print("send时捕获到异常:%s" % error)
            if r in inputs:
                inputs.remove(r)
            if r in outputs:
                outputs.remove(r)
            r.close()
            if r in data_queue:
                del data_queue[r]
        else:
            if w in outputs:
                outputs.remove(w)  # 有可能一次send不完,这里也可以不remove,用另外一个方法
            # 开头用get_nowait无阻塞模式取队列,捕获到队列为空的异常,再remove掉这个连接
    # 还有一个exceptional没处理,还是和上面一样,再写一个for循环
    # 异常处理这里还是简单粗暴把异常列表中的连接清除就好了
    for e in exceptional:
        print("异常列表有返回:", e)
        if e in inputs:
            inputs.remove(e)
        if e in outputs:
            outputs.remove(e)
        e.close()
        if e in data_queue:
            del data_queue[e]

学了那么多都没用,应该也用不到,而且可能还会有别的问题。最可怕的是别人都不这么用,有问题都找不到人解决。主要是通过select的使用来了解IO多路复用。除了select,还有poll和epoll。 epoll 更高效,但是代码也更复杂。不过这些我们都用不到,都太底层了。平时用用已经封装好的模块就好了。这里通过学习大概了解一下底层是怎么实现的。剩下的会用模块就好了,现在至少知道模块中是怎么运作的了。 最后就是学一下下面的已经封装的selectors模块,底层都清楚了,学习使用下面的模块已经没有难度了。

selectors 模块

平时直接用已经封装好的模块,简单坑又少。selectors模块,默认会用epoll,如果系统不支持,就用select,完美。 理解了前面select的机制,在使用这个模块就简单了。步骤都一样。步骤都一样,但是都封装好了。

import selectors
import socket

def accept(sock, mask):
    """建立新连接并注册"""
    conn, addr = sock.accept()
    print('接收到新的客户端连接:\n', conn, addr)
    conn.setblocking(False)  # sock里已经设置为False,这里貌似没意义,反正没差
    # 上面已经建立好连接了,把新连接注册到sel里,这是第二次注册了
    # 第一次注册是注册server接受客户端连接请求的连接
    # 这里是连接建立后收发数据的连接,这个连接如果发现是活动的,调用的就是read方法了
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    """接收数据"""
    data = conn.recv(1024)
    if data:
        print(data.decode('utf-8'))
        conn.send(data.upper())
    else:
        print("客户端已断开:\n", conn)
        sel.unregister(conn)  # 注销这个连接
        conn.close()

def server(port):
    sock = socket.socket()
    sock.bind(('localhost', port))
    sock.listen()
    print("监听已经开启")
    sock.setblocking(False)
    # 下面就是select方法,也可能是epoll
    # 注册你的socket,就是让select监视,监视到有活动的连接,就调用accept函数
    sel.register(sock, selectors.EVENT_READ, accept)
    # 上面还没有开始监视,只是先把select准备好
    while True:
        events = sel.select()  # 这里就还是监视了
        # 这里会阻塞,一旦有活动的连接,就会返回给events列表
        for key, mask in events:
            callback = key.data  # key.data就是sel.register里的accpet这个函数
            # 现在callback就是accpet这个函数了,下面加上括号填上参数就执行了
            callback(key.fileobj, mask)  # key.fileobj就是sel.register里的sock

if __name__ == '__main__':
    sel = selectors.DefaultSelector()  # 老套路,用之前先实例化一个对象
    server(10001)

还是有那个老问题,客户端强制断开,服务端会报错 "ConnectionResetError" 。

作业

SELECT版FTP :

  • SELECT或SELECTORS模块实现并发简单版FTP
  • 允许多用户并发上传下载文件
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 多线程的使用场景
  • 多进程
    • 父进程和子进程的id号
      • 进程间通讯
        • 通过队列 - Queue
        • 通过管道 - Pipe
        • 通过 Manager 共享
        • 进程锁 Lock
      • 进程池
        • apply
        • apply_rsync
        • 回调函数
    • 协程
      • yield
        • greenlet 模块
          • gevent 模块
            • 并发爬网页
            • 高并发的socket_server
        • 事件驱动与异步IO
          • 事件驱动编程范式
            • 五种IO模式
              • IO模式
            • IO多路复用
              • 写一个非阻塞的socket
                • 开启监听
                • 接收一条数据
                • 可以接收多个连接请求
                • 将收到的数据发回给客户端
                • 处理客户端断开的问题
                • send 和 recv 异常的问题
                • 改进writeable循环
                • remove队列的问题
                • writeable的另外一个写法
                • 小结 - 最终版本
            • selectors 模块
            • 作业
            相关产品与服务
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档