专栏首页零基础使用Django2.0.1打造在线教育网站Python中的多路复用 (select、poll 和 epoll)

Python中的多路复用 (select、poll 和 epoll)

多路复用太复杂了,和以往的编程方式差别很大,一时半会大家可能理解不了。在写这篇文章的时候,我复习了一些进程、线程和协程相关的内容,但还是难免理解困难,因此只希望大家通过这篇文章对Python中的IO多路复用有个了解就行。

我们知道CPU的计算时间远比IO操作所花费的时间小的多,因此在阻塞式IO中,CPU的利用率不是很高。

而在非阻塞式IO中,没有等待立即返回(当然阻塞是不会消耗CPU的),但是这里面存在一个问题就是无法知晓是否已完成,需要二次判断(需要花费大量时间用于状态判断)。如果后续操作是建立在前面完成的基础上,那这个非阻塞式IO的效果并没有那么好,甚至会比阻塞式IO还差;如果后续操作不依赖于前述操作,而效果非常明显。因此无法准确说明是非阻塞式IO强于阻塞式IO,还是阻塞式IO强于非阻塞式IO,没有一个结论,需要结合具体的应用场景。

而在IO复用中,select方法其实也是阻塞的,如果操作系统中没有一个socket或者没有一个文件句柄准备好了,这个情况下它会一直阻塞下去,有的话就会立即返回,实际上它节省了等待数据的过程,但是将数据从内核拷贝到用户空间这一过程还是无能为力。

最后的异步IO才是真正意义上的异步IO,以aio开头。但是现在我们所接触的很多高并发框架都没有使用aio,而是使用IO多复用机制。IO多复用机制非常成熟,而且较为稳定,真正的aio在我们运用中并不多,因为在使用过程中这个aio比我们的IO多复用性能并没有非常大的提升,因此目前还是使用IO多复用。真正意义上的aio其实是少了拷贝数据的过程(操作系统准备好了以后才发),因为操作系统会将数据从内核复制到用户空间之后,再给我们的信号处理程序发起一个请求。这才是真正意义上的异步IO,但是它的编码难度很大,比IO多复用高很多,因此平常使用的框架都是在大量使用IO多复用技术。

select、poll、epoll

select、poll、epoll都是IO多路复用的机制。IO多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者是写就绪),能够通知程序进行相应的读写操作。但select、poll、epoll本质上都是同步IO,因为它们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步IO则无需自己负责进行读写,异步IO的实现会负责把数据从内核拷贝到用户空间。(操作系统会将数据从内核复制到用户空间之后,再给我们的信号处理程序发起一个请求)

select

select函数监视的文件描述符分为3类,分别是writefds、readfds和exceptfds。调用后select函数会被阻塞,直到有描述符就绪(有数据可读、可写或者有except)、或者超时(timeout可用于指定等待时间,如果想立即返回可设置为null),函数返回。当select函数返回后,可以通过遍历fdset来找到就绪的描述符。

目前几乎在所有的平台上都支持select,其良好的跨平台支持也是它的一个优点。不过它的缺点在于单个进程能够监视的文件描述符的数量是有限制的,在linux上一般为1024,但是可以通过修改宏定义甚至是重新编译内核的方式来提升这一限制,不过这样也会降低效率,如上面需要遍历所有的fdset。那么在此之后就发展产生了poll。

poll

不同于select使用三个位图来表示三个fdset的方式,poll使用一个pollfd指针来实现。pollfd结构包含了要监视的event和发生的event,不再使用select的“参数-值”传递的方式。同时pollfd并没有最大数量限制(但是数量过大后其性能也会降低)。和select函数一样,poll返回后需要轮询pollfd来获取就绪的描述符。

从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。实际上,同时连接的大量客户端在同一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

epoll

epoll只在Linux下支持,Windows下面并不支持,它是在Linux的2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间只需复制一次。(使用了数据结构中性能很高的红黑树)

但是需要说明的是epoll并不一定比select好,这个需要结合具体情况来分析:在并发高的情况下,且连接活跃度不是很高(如Web系统),此时epoll就比select好;当并发性不是很高,但连接活跃度很高的时,select就比epoll好。

通过非阻塞IO实现http请求

下面是一段使用socket来模拟http请求的代码:

# requests -->urllib -->socket
import socket
from urllib.parse import urlparse


def get_url(url):
    # 通过socket请求Url
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    # 建立socket连接
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, 80))

    client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))# 相对路径
    data = b""
    while True:
        d = client.recv(1024)
        if d:
            data += d
        else:
            break

    data = data.decode('utf8')
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)
    client.close()


if __name__ == '__main__':
    get_url("http://www.baidu.com/")

如果你单纯的设置为非阻塞IO,那么程序运行会报错:

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)  # 置为非阻塞IO
client.connect((host, 80))  # 此行代码就会报BlockingIOError错误

# 运行结果:
BlockingIOError: [WinError 10035] 无法立即完成一个非阻止性套接字操作。

当然我们前面也说过阻塞不会消耗cpu,但是会影响程序的运行啊,所以需要捕捉异常,这个异常的出现是正常的,可以不用理会,但要捕捉,修改如下:

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setblocking(False)
try:
    client.connect((host, 80))  # 此行代码就会报BlockingIOError错误
except BlockingIOError as e:
    pass

# 运行结果:
OSError: [WinError 10057] 由于套接字没有连接并且(当使用一个 sendto 调用发送数据报套接字时)没有提供地址,发送或接收数据的请求没有被接受。

这个错误也是非常明显,就是因为我们还没有建立连接,下一步的 client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))# 相对路径其实是依赖于上一步的状态,因此需要不停的去尝试获取链接,修改如下:

while True:
    try:
        client.send(
            "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))  # 相对路径
        break
    except OSError as e:
        pass

修改后继续运行,如果不出所料还是会报之前的BlockingIOError错误,知道原因在哪里么?对,之前是发送数据出了问题,那么接受数据也会出问题,修改代码如下:

 while True:
        try:
            d = client.recv(1024)  # 此行代码就会报BlockingIOError错误
        except BlockingIOError as e:
            continue
        if d:
            data += d
        else:
            break

之后运行发现没有问题。

在前面使用了非阻塞式IO实现了http请求,但是后续的操作是依赖于前序操作,如果前序操作出了问题,后续也会出错,而且后续还是不断的请求,其实并没有减少返回的时间,并没有提高我们的并发。接下来接受如何使用select来完成我们的http请求。

通过select实现http请求

我们知道import select包中有一个select方法,我们可以看一下它的源码,里面有四个参数:rlist, wlist, xlist, timeout

import select
select.select()
def select(rlist, wlist, xlist, timeout=None):

但是我们实际上不直接使用它,而是使用from selectors import DefaultSelector,可以发现这个DefaultSelector中依旧是调用了select函数:

  if sys.platform == 'win32':
        def _select(self, r, w, _, timeout=None):
            r, w, x = select.select(r, w, w, timeout)
            return r, w + x, []

不过它将我们这个select函数进行了封装,使得使用起来更加方便。前面说过epoll只在Linux下支持,Windows下面并不支持,但是使用了这个DefaultSelector以后,你就不用操心到底选用poll还是epoll,它会根据平台自动选择,在windows上使用select,在Linux上使用epoll,因此更推荐大家使用DefaultSelectorDefaultSelector其实除了可以选择IO复用的方法以外,还提供了一种注册的机制,往下看就知道了。接下来我们不再使用函数来实现http请求,而是使用一个类来实现,因为在后续后调过程中会使用之前的变量,一旦使用了函数实现,变量就仅仅是局部变量了,无法提供给后续使用,尽管使用全局变量可以解决这个问题,但是由于涉及到的全局变量的数量过多,因此不太建议使用全局变量这种模式,推荐使用类来实现。

我们使用了DefaultSelector的注册机制,将socket注册到selector中,这一步非常关键,需要使用全局的selector,因此需提前实例化一个selector = DefaultSelector(),同时发现这个register方法的源码:

def register(self, fileobj, events, data=None):

里面有三个参数,第一个是fileobj也就是socket event(一般是socket的文件描述符fileno());第二个是events就是我们的事件,如EVENT_READ,EVENT_WRITE等,记住发送信息是WRITE事件;第三个参数是data即回调函数,select模式都是回调函数组成的,即当它变为可写的时候,我们应该执行什么逻辑。

注意有一个事件循环的概念,如果你之前没有接触过这种回调模式,可能会想当然的以为一个函数中有回调,然后当它变为可读的时候,操作系统会帮我们调用可读应当执行的逻辑,其实这是错误的思维,回调仍然是由程序员来完成的。也就是说,当它变为可读的时候,我们需要调用可读时应当执行的逻辑。因为我们需要写一个loop函数,去不定的调用selector来判断哪一个socket准备就绪,它是可读还是可写并调用相应的回调函数,再次强调一遍,这个回调是自己来完成的。

def loop():
    # 1、select本身不支持register模式,此处的selector是对select的一个封装
    # 2、socket状态变化以后的回调是由程序员完成的,不是操作系统完成的(此处使用IO多路复用)
    while True:
        ready  = selector.select()
        for key,mask in ready:
            call_back = key.data
            call_back(key)

注意select本身不支持register模式,select只能传递readable,writeable等socket的句柄,然后操作系统将可读或者可写的句柄返回过来,但是这个可读或者可写的回调函数本身是什么,select本身是不提供的,此处的selector是对select的一个封装,因此可以完成register模式,这样在使用的时候,拿到某一个socket的句柄时,就知道应该回调哪个函数或者方法,这就是我们建议使用selector的原因。除此之外,socket状态变化以后的回调是由程序员完成的,不是操作系统完成的,此处使用IO多路复用select。

这里的loop循环是主循环,会一直在selector中查询一些信息,可以查看selector.select()函数的源码,注意此处的select方法和前面import select ,select.select()中的select方法不同,前面的select方法需要传入四个参数:

def select(rlist, wlist, xlist, timeout=None):

这个在前面就已经介绍过了。继续回到封装后的selector.select()方法:

    def select(self, timeout=None):
        timeout = None if timeout is None else max(timeout, 0)
        ready = []
        try:
            r, w, _ = self._select(self._readers, self._writers, [], timeout)
        except InterruptedError:
            return ready
        r = set(r)
        w = set(w)
        for fd in r | w:
            events = 0
            if fd in r:
                events |= EVENT_READ
            if fd in w:
                events |= EVENT_WRITE

            key = self._key_from_fd(fd)
            if key:
                ready.append((key, events & key.events))
        return ready

可以发现最后返回的是一个read列表,里面存放的是一个个tuple,tuple中包含的是keyevents&key.events(events的位运算)。这里主要关心key,这个key是什么类型呢?可以在register函数中发现它调用了父类的register:

    def register(self, fileobj, events, data=None):
        key = super().register(fileobj, events, data)

在父类的register中可以发现这个key的类型,它其实是SelectorKey对象:

key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)

再来看一下SelectorKey这个类的源码,发现里面其实是一个命名的元组,包含了诸如 ['fileobj', 'fd', 'events', 'data']等属性。

SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])

同时注意到此处的select是不需要传入参数,因为在前面已经使用selectorregister函数将socket传入了,因此再次推荐大家使用selector

def select(self, timeout=None):

事件循环,不停的请求socket状态,并调用对应的回调函数(事件循环模式在使用IO多路复用的时候都会存在,比如Twisted,Tornado,Gevent,协程,asyncIO等,都是这种模式,即回调+事件循环+select(pol/epoll)模式)。完整的代码如下所示:

#!/usr/bin/python
# -*- coding:utf-8 -*-  
# @Time: 2018/11/1 22:11  
# @Author: Envy  

# epoll并不一定比select好,这个需要结合具体情况来分析:
# 在并发高的情况下,且连接活跃度不是很高(如Web系统),此时epoll就比select好;
# 当并发性不是很高,但连接活跃度很高的时,select就比epoll好。


# 通过非阻塞IO实现Http请求

import socket
from urllib.parse import urlparse

# 通过select来完成http请求
from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE
selector = DefaultSelector()
# selector其实是封装了select


class Fetcher(object):
    def connected(self,key):
        selector.unregister(key.fd)  # 此处的key.fd其实就是self.client.fileno()的返回值
        # 注意此处不再需要将send方法进行异常捕获,因为我们采用了事件监听,当调用client的时候就说明已经处于就绪状态,不再需要之前的轮询操作
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode('utf8'))  # 相对路径
        # 接下来又需要监听socket,看它什么时候是可读状态,因此需要传入EVENT_READ可读事件
        selector.register(self.client.fileno(),EVENT_READ,self.readable)  # 这里是函数名称,不是函数调用

    # 当我们知道socket已经是可读的时候,我们应该处理什么逻辑,也就是readable函数
    def readable(self,key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode('utf8')
            html_data = data.split("\r\n\r\n")[1]
            print(html_data)
            self.client.close()

    def get_url(self,url):
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)   # 使用非阻塞IO
        try:
            # 此行代码就会报BlockingIOError错误,因此需要捕获
            self.client.connect((self.host, 80))    # 阻塞不会消耗CPU
        except BlockingIOError as e:
            pass

        # 将socket注册到selector中,这一步非常关键,需要使用全局的selector
        selector.register(self.client.fileno(),EVENT_WRITE,self.connected)  # 这里是函数名称,不是函数调用


def loop():
    # 事件循环,不停的请求socket状态,并调用对应的回调函数(事件循环模式在使用IO多路复用的时候都会存在,如Twisted,Tornado,Gevent,协程,asyncIO等
    # 1、select本身不支持register模式,此处的selector是对select的一个封装
    # 2、socket状态变化以后的回调是由程序员完成的,不是操作系统完成的(此处使用IO多路复用)
    while True:
        ready  = selector.select()
        for key,mask in ready:
            call_back = key.data  # 回调函数
            call_back(key)  # 拿到回调函数后就能调用它,这个key是SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])中的信息
    # 回调 + 事件循环 + select(pol / epoll)模式


if __name__ == '__main__':
    fetcher = Fetcher()
    fetcher.get_url("http://www.baidu.com/")
    # 当运行主函数的get_url后,代码从def get_url(self,url):至selector.register处,之后的回调函数是由loop函数来决定的
    # 接下来由loop函数来决定使用哪个回调函数
    loop()

运行以后会发现百度首页确实被抓取下来了,但是在末尾程序抛出了一个错误:

r, w, x = select.select(r, w, w, timeout)
OSError: [WinError 10022] 提供了一个无效的参数。

问题出现在这行语句上面:

ready  = selector.select()

其实这不算是错误,那是因为在wndows中,默认调用的select是import select ,select.select()中的select方法,需要传入四个参数,而我们没有传递参数

def select(rlist, wlist, xlist, timeout=None):

但是如果把这个代码放到linux上运行时,就不会出错,默认会使用epoll select,不会发生异常,会一直阻塞下去。那么要在Windows上解决这个问题,应该怎样操作呢?

其实非常简单,只需要将url提出,放于一个列表中,接着定义一个标志默认为False,当读完某个url的时候,就将其从列表中移除,然后标志变为True,最后修改loop函数中永真表达式,while not stop即可,修改后的代码如下:

#!/usr/bin/python
# -*- coding:utf-8 -*-  
# @Time: 2019/11/1 22:11
# @Author: Envy
# @File: select_http_test.py

# epoll并不一定比select好,这个需要结合具体情况来分析:
# 在并发高的情况下,且连接活跃度不是很高(如Web系统),此时epoll就比select好;
# 当并发性不是很高,但连接活跃度很高的时,select就比epoll好。


# 通过非阻塞IO实现Http请求

import socket
from urllib.parse import urlparse

# 通过select来完成http请求
from selectors import DefaultSelector,EVENT_READ,EVENT_WRITE
selector = DefaultSelector()
# selector其实是封装了select
urls = ["http://www.baidu.com/"]
stop = False


class Fetcher(object):
    def connected(self,key):
        selector.unregister(key.fd)  # 此处的key.fd其实就是self.client.fileno()的返回值
        # 注意此处不再需要将send方法进行异常捕获,因为我们采用了事件监听,当调用client的时候就说明已经处于就绪状态,不再需要之前的轮询操作
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode('utf8'))  # 相对路径
        # 接下来又需要监听socket,看它什么时候是可读状态,因此需要传入EVENT_READ可读事件
        selector.register(self.client.fileno(),EVENT_READ,self.readable)  # 这里是函数名称,不是函数调用

    # 当我们知道socket已经是可读的时候,我们应该处理什么逻辑,也就是readable函数
    def readable(self,key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode('utf8')
            html_data = data.split("\r\n\r\n")[1]
            print(html_data)
            self.client.close()

            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop = True

    def get_url(self,url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)   # 使用非阻塞IO
        try:
            # 此行代码就会报BlockingIOError错误,因此需要捕获
            self.client.connect((self.host, 80))    # 阻塞不会消耗CPU
        except BlockingIOError as e:
            pass

        # 将socket注册到selector中,这一步非常关键,需要使用全局的selector
        selector.register(self.client.fileno(),EVENT_WRITE,self.connected)  # 这里是函数名称,不是函数调用


def loop():
    # 事件循环,不停的请求socket状态,并调用对应的回调函数(事件循环模式在使用IO多路复用的时候都会存在,如Twisted,Tornado,Gevent,协程,asyncIO等
    # 1、select本身不支持register模式,此处的selector是对select的一个封装
    # 2、socket状态变化以后的回调是由程序员完成的,不是操作系统完成的(此处使用IO多路复用)
    while not stop:
        ready  = selector.select()
        for key,mask in ready:
            call_back = key.data  # 回调函数
            call_back(key)  # 拿到回调函数后就能调用它,这个key是SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])中的信息
    # 回调 + 事件循环 + select(pol / epoll)模式


if __name__ == '__main__':
    fetcher = Fetcher()
    fetcher.get_url("http://www.baidu.com/")
    # 当运行主函数的get_url后,代码从def get_url(self,url):至selector.register处,之后的回调函数是由loop函数来决定的
    # 接下来由loop函数来决定使用哪个回调函数
    loop()

再次运行发现程序可以输出所需信息。

总结一下,这里我们采用了select(poll/epoll)+事件循环+回调模式,这种模式和以前同步开发的模式差异很大,特别是事件循环的引入,如果大家对回调模式不是很了解,就会搞不清楚为什么需要一个事件循环。再一个就是回调,我们在seletor中注册了回调函数,这种模式也和之前的开发模式差异也很大,还有一个就是使用了类,而不是函数来编程,因为这其中涉及到回调函数,回调函数里面包含很多的实例属性(准确来说是全局的变量),使用函数是很难实现的,因此在实际开发建议使用类。

回过头来,感觉我们把一个非常简单的问题变得很复杂起来,而且采用了不同于以往的模式,它有什么好处,值得我们去这么费劲去使用呢?

它的好处就是并发性高,上面都是依赖于loop函数这个事件循环,它知道我们应该去调用哪个代码,去执行什么方法,这里面不会去阻塞我们建立连接或者是等待的过程,实际上是在不停的请求哪些socket已经准备好了,一旦某个socket准备好,就立马执行回调函数,在回调函数中没有处理一些与IO操作相关的逻辑,都是CPU操作,CPU操作效率是远远超过IO操作,特别是网络IO,它们不在一个单位级上面。因此相对于线程切换,CPU的操作更快,以前当我们需要同时抓取多个url的时候,需要使用多个线程,由于GIL的存在使得我么需要在这些线程中来回切换;现在有了这种回调和事件循环的模式,我们只需要一个线程,当一个url的连接正在建立的时候,由于这是非阻塞式方法,它会立马返回,然后注册到selector中,然后selector中的select方法去找那些已经准备就绪的socket,它有可能是之前的任意一个url,如可读或者可写的状态都是可能的,它只会去寻找那些已经准备好的socket,然后执行它的回调方法,所以不会有再去等待网络IO的情况,除非所以的url都在阻塞,这样看来上面使用的单线程模式省去了因线程切换花费的开销,还有它的内存,一个线程的内存是远远高于函数回调这种模式的,回调函数其实就是指向函数的一个句柄而已,所以在并发的时候,系统中如果存在几十个线程时,它的切换时间就会很长,效率很慢。但是这个回调模式对于成千上万个线程时,它都不会有任何线程切换和内存消耗问题,因为它是单线程模式,其实这个也是协程的核心点,Twisted,Tornado,Gevent,协程,asyncIO等,也都是这种回调+事件循环+select(pol/epoll)模式

多路复用效率说明

可能大家还是觉得它的并发性不是太明显,接下来通过同时抓取17个url的例子(因为当时只有17页内容)给大家展示效率问题,以我个人博客为例:编程思录:

# !/usr/bin/python
# -*- coding:utf-8 -*-  
# @Time: 2018/11/1 22:11
# @Author: Envy
# @File: select_http_test.py

# epoll并不一定比select好,这个需要结合具体情况来分析:
# 在并发高的情况下,且连接活跃度不是很高(如Web系统),此时epoll就比select好;
# 当并发性不是很高,但连接活跃度很高的时,select就比epoll好。


# 通过非阻塞IO实现Http请求

import socket
from urllib.parse import urlparse

# 通过select来完成http请求
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
# selector其实是封装了select
urls = []
stop = False


class Fetcher(object):
    def connected(self, key):
        selector.unregister(key.fd)  # 此处的key.fd其实就是self.client.fileno()的返回值
        # 注意此处不再需要将send方法进行异常捕获,因为我们采用了事件监听,当调用client的时候就说明已经处于就绪状态,不再需要之前的轮询操作
        self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode(
            'utf8'))  # 相对路径
        # 接下来又需要监听socket,看它什么时候是可读状态,因此需要传入EVENT_READ可读事件
        selector.register(self.client.fileno(), EVENT_READ, self.readable)  # 这里是函数名称,不是函数调用

    # 当我们知道socket已经是可读的时候,我们应该处理什么逻辑,也就是readable函数
    def readable(self, key):
        d = self.client.recv(1024)
        if d:
            self.data += d
        else:
            selector.unregister(key.fd)
            data = self.data.decode('utf8')
            html_data = data.split("\r\n\r\n")[1]
            print(html_data)
            self.client.close()

            urls.remove(self.spider_url)
            if not urls:
                global stop
                stop = True

    def get_url(self, url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)  # 使用非阻塞IO
        try:
            # 此行代码就会报BlockingIOError错误,因此需要捕获
            self.client.connect((self.host, 80))  # 阻塞不会消耗CPU
        except BlockingIOError as e:
            pass

        # 将socket注册到selector中,这一步非常关键,需要使用全局的selector
        selector.register(self.client.fileno(), EVENT_WRITE, self.connected)  # 这里是函数名称,不是函数调用


def loop():
    # 事件循环,不停的请求socket状态,并调用对应的回调函数(事件循环模式在使用IO多路复用的时候都会存在,如Twisted,Tornado,Gevent,协程,asyncIO等
    # 1、select本身不支持register模式,此处的selector是对select的一个封装
    # 2、socket状态变化以后的回调是由程序员完成的,不是操作系统完成的(此处使用IO多路复用)
    while not stop:
        ready = selector.select()
        for key, mask in ready:
            call_back = key.data  # 回调函数
            call_back(
                key)  # 拿到回调函数后就能调用它,这个key是SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])中的信息
    # 回调 + 事件循环 + select(pol / epoll)模式


if __name__ == '__main__':
    fetcher = Fetcher()

    import time
    start_time = time.time()  # 开始时间
    for i in range(17):
        url = "https://blog.licheetools.top/page/{}/".format(i)
        urls.append(url)
        fetcher = Fetcher()
        fetcher.get_url(url)
    # 当运行主函数的get_url后,代码从def get_url(self,url):至selector.register处,之后的回调函数是由loop函数来决定的
    # 接下来由loop函数来决定使用哪个回调函数
    loop()
    end_time = time.time()   # 结束时间
    print(end_time-start_time)

//运行结果:
抓取的数据。。。。。。
0.57608962059021

我们再来使用之前的select代码,注意取消其中的client.setblocking(False),因为采用阻塞式模式会报错,其实也不会报错只是效率影响一些,修改后的代码如下:

#!/usr/bin/python
# -*- coding:utf-8 -*-  
# @Time: 2018/10/11 22:16
# @Author: Envy
# @File: select_test.py


# epoll并不一定比select好,这个需要结合具体情况来分析:
# 在并发高的情况下,且连接活跃度不是很高(如Web系统),此时epoll就比select好;
# 当并发性不是很高,但连接活跃度很高的时,select就比epoll好。


# 通过非阻塞IO实现Http请求

# requests -->urllib -->socket
import socket
from urllib.parse import urlparse

# 通过非阻塞IO完成Http请求


def get_url(url):
    # 通过socket请求Url
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    # 建立socket连接
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # client.setblocking(False)
    try:
        client.connect((host, 80))  # 此行代码就会报BlockingIOError错误
    except BlockingIOError as e:
        pass

    while True:
        try:
            client.send(
                "GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))  # 相对路径
            break
        except OSError as e:
            pass
    data = b""
    while True:
        try:
            d = client.recv(1024)  # 此行代码就会报BlockingIOError错误
        except BlockingIOError as e:
            continue
        if d:
            data += d
        else:
            break

    data = data.decode('utf8')
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)
    client.close()


if __name__ == '__main__':
    import time
    start_time = time.time()  # 开始时间

    for i in range(17):
        url = "https://blog.licheetools.top/page/{}/".format(i)
        get_url(url)
    end_time = time.time()   # 结束时间
    print(end_time-start_time)
//运行结果:
抓取的数据。。。。。。
6.399325132369995

明显发现这个速度很慢,是抓到一个输出一个的方式。

上面通过使用回调+事件循环+select(pol/epoll)模式来实现并发,注意本文使用的是单线程,其实你还可以通过这个回调+事件循环+select(pol/epoll)模式来实现一个聊天群,群里面是某个人发消息后会调用这个selector.select来获取其他人的socket,然后将这个socket中的数据发送过去。

回调模式弊端

在前面,我们通过使用回调+事件循环+select(pol/epoll)模式来实现并发,它的性能非常高但写法与之前的编程方式差别很大。同步编程的方式是非常简单易懂的思维模式,回调+事件循环+select(pol/epoll)模式却给我们现有的编程模式提出了很大的挑战,这种回调使得我们在去进行下一步操作的时候,让事件循环去驱动回调,而且它将传统的代码分割的较为散乱。之前使用传统的socket方式去请求url的时候,代码是从上而下执行,代码逻辑非常清晰易懂:先建立连接,然后发送数据,接着等待数据返回并接收,最后关闭连接。而使用回调+事件循环+select(pol/epoll)模式时,不知从哪里冒出来一个事件循环,且它是一直循环的状态。

    def get_url(self,url):
        self.spider_url = url
        url = urlparse(url)
        self.host = url.netloc
        self.path = url.path
        self.data = b""
        if self.path == "":
            self.path = "/"

        # 建立socket连接
        self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.client.setblocking(False)   # 使用非阻塞IO
        try:
            # 此行代码就会报BlockingIOError错误,因此需要捕获
            self.client.connect((self.host, 80))    # 阻塞不会消耗CPU
        except BlockingIOError as e:
            pass

        # 将socket注册到selector中,这一步非常关键,需要使用全局的selector
        selector.register(self.client.fileno(),EVENT_WRITE,self.connected)  # 这里是函数名称,不是函数调用

这个get_url函数运行到 selector.register之前逻辑也较为清晰,只是之后就是注册机制,注册了一个回调函数connected,后面就没代码了。而且将发送数据的逻辑放到了回调函数connected里面,同时在回调函数connected函数中又将回调函数readable注册进去,这样我们原本从上而下顺序执行的三个阶段,就被拆分到了三个函数中,使得代码分割的较为散乱,加大了代码维护的难度,因此就说这种模式使用起来是非常不顺手的。接下来看看到底有哪些不顺之处?

1、回调函数执行不正常该如何?(无法定位异常,排除错误) 2、回调函数里面需要嵌套回调,甚至是多层嵌套,该如何?(多层回调情况很常见) 3、回调函数中多层嵌套时,其中某个环节出了问题会造成什么后果?4、如果有某个数据需要被多个,甚至是每个回调函数处理,那该如何操作?5、如何使用当前函数中的局部变量?(只能使用函数,不能使用类的情况)

上面这种弊端非常难以解决,我们迫切希望能有一种方式,既能拥有回调模式的高性能,又有同步编程方式的写法(或者说是逻辑思维,易懂,其实就是代码自上而下运行),这就是协程存在的理由。

之所以介绍IO多路复用其实是为了说明协程诞生的背景和意义它将高并发与传统编码思维进行融合,由此实现了高并发的易用性和可读性。那么写到这里,本篇文章就结束了,但是关于IO多路复用的知识,可能我所了解的只是皮毛,还需要更深层次的去学习和使用。

本文分享自微信公众号 - 啃饼思录(kbthinking)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-11-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 你有分析过自己单身的原因吗?

    好奇这个调查是怎么来的?猪哥认为真实性有待考证,刚好这几天我们也学习了如何爬取微博话题,今天就来分析一下为何很多同学如此优秀却依然单身!

    测试小兵
  • 真狠,为了干掉 HTTP ,Spring团队又开源nohttp了!

    Spring Security、Session 和 LDAP 项目负责人 ROB WINCH 指出,Spring 团队竭尽全力更新所有 URL 以使用 HTTP...

    搜云库技术团队
  • http编程系列(二)——java爬虫实现刷个人博客的访问量

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    逝兮诚
  • 爬取友商产品信息

    产品类别url地址为:http://www.dahuatech.com/product.html

    zx钟
  • shiro源码解析-doFilter

    shiro应该算的上java中最流行的权限框架了,使用的多了,便想着研究一下源码,看它究竟怎么运行的。

    逝兮诚
  • Python Web 之Flask基础(一)

    在 Flask 应用中定义路由的最简便方式是使用Flask实例提供的 app.route 装饰器。

    arcticfox
  • springMVC系列(七)——springMVC实现restful风格开发(post、get、put、delete)

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    逝兮诚
  • CVE-2019-11043PHP-FPM在Nginx特定配置下远程代码执行漏洞复现

    9月26日,PHP官方发布漏洞通告,提到Nginx与php-fpm服务器上存在的一处高危漏洞,由于Nginx的fastcgi_split_path_info模块...

    墙角睡大觉
  • 巧用linux云服务器下的的/dev/shm/,避开磁盘IO不给力!

    tmpfs是Linux/Unix系统上的一种基于内存的文件系统。tmpfs可以使用您的内存或swap分区来存储文件。由此可见,tmpfs主要存储暂存的文件。它有...

    小小科
  • http编程系列(一)——URL使用和爬取博客图片小DEMO

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    逝兮诚

扫码关注云+社区

领取腾讯云代金券