首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python之I/O多路复用

Python之I/O多路复用

作者头像
py3study
发布2020-01-09 18:07:56
8530
发布2020-01-09 18:07:56
举报
文章被收录于专栏:python3python3

回顾Socket

一、Socket起源:

socket起源于Unix,而Unix/Linux基本哲学之一就是“一切皆文件”,对于文件用【打开】【读写】【关闭】模式来操作。

socket就是该模式的一个实现,socket即是一种特殊的文件,一些socket函数就是对其进行的操作(读/写IO、打开、关闭)

“他是所有WEB服务器的祖宗”

pupepet、ansible、他们也可以通过输入命令然后返回结果这个也是基于Socket来实现的。

二、socket和file的区别:     file模块是针对某个指定文件进行【打开】【读写】【关闭】     socket模块是针对 服务器端 和 客户端Socket 进行【打开】【读写】【关闭】

三、原生Socket增强:

上一篇《初识socket》:server端他们仅能处理一个请求在有连接过来的时候,如果第一个请求在和服务器连接中,那么第二个只能等待第一个断开后第二个才能连接

wKioL1cQriyiYxCKAABojrq3VhA360.png
wKioL1cQriyiYxCKAABojrq3VhA360.png

过程:

第一请求发发了一个操作,server端返回了,那么现在两头等在等待这输入。

那么这段时间第二个请求还在等待!现在服务端是不是在空闲着呢?他只占着I/O资源,CPU是不是空闲着呢?他阻塞着后面的请求无法进来。

那么遇到这种情况如何解决呢?继续往下看

网络IO模型:阻塞IO和非阻塞IO|同步IO和异步IO

介绍:

    网络I/O模型讨论的背景是Linux环境下的network IO。本文最重要的参考文献是Richard Stevens的“UNIX? Network Programming Volume 1, Third Edition: The Sockets Networking ”,6.2节“I/O Models ”,Stevens在这节中详细说明了各种IO的特点和区别,如果英文够好的话,推荐直接阅读。Stevens的文风是有名的深入浅出,所以不用担心看不 懂。

一、什么是I/O

1、先了解什么是I/O:I/O(input/output),即输入/输出端口。每个设备都会有一个专用的I/O地址,用来处理自己的输入输出信息。

2、I/O model:阻塞:blocking IO、非阻塞:non-blocking IO、同步:synchronous  IO  、 异步:asynchronous IO 之间的区别

3、IO发生时涉及的对象和步骤:以输入操作的socket为例:第一步:首先等待网络数据到达,当数据接收就会复制到内核缓冲区中,第二步:复制从内核缓冲区到应用缓冲区

  1. 等待数据准备 (Waiting for the data to be ready)
  2. 将数据从内核拷贝到进程中(Copying the data from the kernel to the process)  记住这两点很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。

二、Blocking I/O Model 默认情况下所有的Socket是阻塞(分享例子----分享完后需要删除本括号内容),看下面的图例:

wKiom1cQrebTqhAdAABoCoKdT4o581.png
wKiom1cQrebTqhAdAABoCoKdT4o581.png

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如没有收到一个完整的TCP/UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除 block的状态,重新运行起来。

所以阻塞:blocking IO的特点是I/O执行时的两个操作(等待数据准备 (Waiting for the data to be ready)、将数据从内核拷贝到进程中(Copying the data from the kernel to the process))都是阻塞的。

python socket中:accept()  recv() 是阻塞的

所以,所谓阻塞型接口是指系统调用(一般是IO接口)如果不返回结果就一直阻塞,就是socket经常说的,有发就有收收发必相等如果两边都在同时收,是不是阻塞着后面的代码就无法执行?

那既然原生的Socket是阻塞的,那有什么办法来解决呢?

使用多线程(或多进程)、多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

我们假设对上述的服务器 / 客户机模型,提出更高的要求,即让服务器同时为多个客户机提供一问一答的服务。于是有了如下的模型。

wKiom1cQrg6Dz3EMAAHuXcPu6xs200.png
wKiom1cQrg6Dz3EMAAHuXcPu6xs200.png

在上述的线程 / 时间图例中,主线程持续等待客户端的连接请求,如果有连接,则创建新线程,并在新线程中提供为前例同样的问答服务。

很多初学者可能不明白为何一个socket可以accept多次。实际上socket的设计者可能特意为多客户机的情况留下了伏笔,让accept()能够返回一个新的socket。

执行完bind()和listen()后,操作系统已经开始在指定的端口处监听所有的连接请求,如果有请求,则将该连接请求加入请求队列。

调用accept()接口正是从的请求队列抽取第一个连接信息,创建一个新的socket返回句柄。新的socket句柄即是后续read()和recv()的输入参数。如果请求队列当前没有请求,则accept()将进入阻塞状态直到有请求进入队列。 上述多线程的服务器模型似乎完美的解决了为多个客户机提供问答服务的要求,但其实并不尽然。如果要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而线程与进程本身也更容易进入假死状态。 很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。

这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。但是,“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。 对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

三、非阻塞:non-blocking IO 

#!/usr/bin/env python
#-*- coding:utf-8 -*-
import time
import socket
#创建socket对象
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.setsockopt
#设置监听的IP与端口
sk.bind(('127.0.0.1',6666))
#设置client最大等待连接数
sk.listen(5)
sk.setblocking(False) #这里设置setblocking为Falseaccept将不在阻塞,但是如果没有收到请求就会报错
while True: #循环
    try:
        print 'waiting client connection .......'
        #connection代表客户端对象,address是客户端的IP
        connection,address = sk.accept()
        #等待接收客户端信息
        client_messge = connection.recv(1024)
        #打印客户端信息
        print address
        #发送回执信息给client 收发必须相同
        connection.sendall('hello Client this server')
        connection.send()
        #关闭和client的连接
        connection.close()
    except Exception as e:
        print e
    time.sleep(4)
sk.setblocking(False)

看上面的代码,我修改了setblocking的值,那么现在accept()将不再阻塞。所以他类似下面的图:

wKioL1cQryqAI3FQAAEVAn5TKfk333.png
wKioL1cQryqAI3FQAAEVAn5TKfk333.png

EWOULDBLOCK 意思是说:该操作可能会被阻塞。E是error,WOULD BLOCK是可能会被阻塞的意思。

从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从 用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次 发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。 所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。     非阻塞的接口相比于阻塞型接口的显著差异在于,在被调用之后立即返回。python中的  sk.setblocking(False)   accept() 将不会阻塞

四、多路复用IO(IO multiplexing)

IO multiplexing这个词可能有点陌生,但是如果我说select/epoll,大概就都能明白了。有些地方也称这种IO方式为事件驱动IO(event driven IO)。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select /epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。就通知用户进程。它的流程如图:

wKiom1cQrreQ4FV5AAD5KpRUfHU099.png
wKiom1cQrreQ4FV5AAD5KpRUfHU099.png

I/O多路复用指:通过一种机制,可以监视多个描述符,监听的描述符发生了改变,比如可读了或者可写了,一旦它发生了改变,那我就可以得到一个回调信息或者我主动的去知道系统发生变化了!

Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。

根据系统不同:他支持的也不同

Windows Python:
    提供: select
Mac Python:
    提供: select
Linux Python:
    提供: select、poll、epoll

注意:网络操作、文件操作、终端操作等均属于IO操作,对于windows只支持Socket操作,其他系统支持其他IO操作,但是无法检测 普通文件操作 自动上次读取是否已经变化。

普通文件操作所有系统都是完成不了的,普通文件是属于I/O操作!但是对于python来说文件变更python是监控不了的,所以我们能用的只有是“终端的输入输出,Socket的输入输出”

对于Select:

句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间)
 
参数: 可接受四个参数(前三个必须)
返回值:三个列表
 
select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。
1、当参数1序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1 序列中
2、当参数2序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2 序列中
3、当参数3序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3 序列中
4、当超时时间未设置,则select会一直阻塞,直到监听的句柄发生变化
5、当超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。

利用select监听终端操作实例

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import selectimport sys

while True:
    readable, writeable, error = select.select([sys.stdin,],[],[],1)    '''select.select([sys.stdin,],[],[],1)用到I/O多路复用,第一个参数是列表,我放进去的是stdin就是我输入进去东西的描述符,
       相当于打开一个文件,和obj = socket(),类似的文件描述符,
       sys.stdin 他只是一个特殊的文件描述符= 终端的输入,一旦你输入OK select I/O多路复用他就感知到了。
       先看readable这个参数,其他的县不用看一旦你发生了我就他他发到readable里了,
       这里添加的就是修改的那个文件描述符,如果你一直没有修改过,那么readable他就是一个空的列表    '''
    if sys.stdin in readable:        print 'select get stdin',sys.stdin.readline()'''注:
1、[sys.stdin,]  以后不管是列表还是元组在最后的元素后面建议增加一个逗号,那元组举例(1,) | (1) 这两个有区别吗?是不是第二个
更像方法的调用或者函数的调用,加个,是不是更容易分清楚。还有就是在以后写django的配置文件的时候,他是必须要加的。写作习惯
2、select第一个参数他就是监听多个文件句柄,当谁改变了我是不是就可以监听到!
3、select参数里1是超时时间,当到slect那一行后,如果这里还是没有输入,那么我就继续走!''''''
when runing the program get error :
Traceback (most recent call last):
  File "E:/study/GitHub/homework/tianshuai/share_3_select_socket.py", line 8, in <module>
    readable, writeable, error = select.select([sys.stdin,],[],[],1)
select.error: (10093, 'Either the application has not called WSAStartup, or WSAStartup failed')

when windows only use select socket !!!!!'''

利用select监听终端操作实例

#/usr/bin/env python
#-*- coding:utf-8 -*-
import time
import socket
import select
#创建socket对象
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.setsockopt#设置监听的IP与端口
sk.bind(('127.0.0.1',6666))#设置client最大等待连接数sk.listen(5)
sk.setblocking(False)#这里设置setblocking为Falseaccept将不在阻塞,但是如果没有收到请求就会报错
while True:
    readable_list, writeable_list, error_list = select.select([sk,],[],[],2) #监听第一个列表的文件描述符,如果里面有文件描述符发生改变既能捕获并放到readable_list中
    for r in readable_list:    #如果是空列表将不执行,如果是空列表。将执行。
        conn,addr = r.accept()        
        print addr

执行程序并打开IE输入地址:127.0.0.1:6666

('127.0.0.1', 53606)

利用select监听多端口操作实例

#/usr/bin/env python
#-*- coding:utf-8 -*-
import time
import socket
import select

#创建socket对象
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.setsockopt
sk.bind(('127.0.0.1',6666))
#设置client最大等待连接数
sk.listen(5)
sk.setblocking(False) #这里设置setblocking为Falseaccept将不在阻塞,但是如果没有收到请求就会报错
sk1 = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk1.setsockopt
#设置监听的IP与端口
sk1.bind(('127.0.0.1',7777))
#设置client最大等待连接数
sk1.listen(5)
sk1.setblocking(False) #这里设置setblocking为Falseaccept将不在阻塞,但是如果没有收到请求就会报错
while True:
    readable_list, writeable_list, error_list = select.select([sk,sk1,],[],[],2)  #监听第一个列表的文件描述符,如果里面有文件描述符发生改变既能捕获并放到readable_list中
    for r in readable_list:#如果是空列表将不执行,如果是空列表。将执行。
        conn,address = r.accept()
        print address

select socket mulit port

('127.0.0.1', 53809)
('127.0.0.1', 53811)

利用select模拟伪Socket Server操作实例

select socket server - server

#/usr/bin/env python
#-*- coding:utf-8 -*-
import time
import socket
import select
#创建socket对象
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.setsockopt
#设置监听的IP与端口
sk.bind(('127.0.0.1',6666))
#设置client最大等待连接数
sk.listen(5)
sk.setblocking(False) #这里设置setblocking为Falseaccept将不在阻塞,但是如果没有收到请求就会报错
inputs = [sk,] #将sk这个对象加入到列表中,并且赋值给inputs
#原因:看上例conn是客户端对象,客户是一直连接着呢,连接的时候状态变了,连接上之后,连接上之后,还是服务端的socket 有关吗?
#是不是的把他改为动态的?
while True:
    readable_list, writeable_list, error_list = select.select(inputs,[],[],1)  #把第一个参数设为列表动态的添加
    time.sleep(2) #测试使用
    print "inputs list :",inputs     #打印inputs列表,查看执行变化
    print "file descriptor :",readable_list #打印readable_list ,查看执行变化
    for r in readable_list:
        if r == sk:  #这里判断,如果是客户端连接过来的话他不是sk,如果是服务端的socket连接过来的话是sk
            conn,address = r.accept()
            inputs.append(conn)
            print address
        else:
        #如果是客户端,接受和返回数据
            client_data = r.recv(1024)
            r.sendall(client_data)

select socket server - client

#!/usr/bin/env python
#-*- coding:utf-8 -*-
import socket
client = socket.socket()
client.connect(('127.0.0.1',6666))
client.settimeout(5)
while True:
    client_input = raw_input('please input message:').strip()
    client.sendall(client_input)
    server_data = client.recv(1024)
    print server_data

交互过程:

#1 默认,sk这个对象文件句柄就在inputs列表中select监听客户端的请求,当有客户端请求过来 client1 ---> server
#用户捕获了变化readable_list = [sk,]  那么循环是有值得,判断r = sk 说明是一个新的请求链接,然后把client链接加入到inputs里 inputs = [sk,conn1,]
#如果现在什么都不做,那么select无法捕获到变化:readable_list = []
#执行看下:
inputs list : [<socket._socketobject object at 0x0000000002C66798>] #默认inputs list 就有一个server socket sk 对象
file descriptor : [<socket._socketobject object at 0x0000000002C66798>]  #当有客户端请求过来时候,sk发生了变化,select捕获到了
('127.0.0.1', 62495)
inputs list : [<socket._socketobject object at 0x0000000002C66798>, <socket._socketobject object at 0x0000000002C66800>]  #第二次循环的时候,inputs = [sk,conn1,]
file descriptor : [] #第二次循环的时候readable_list = [] 因为客户端没有做任何操作,没有捕获到变化所以为空
#2 又有一个新的链接过来了,谁变化了?  sk 他变化了,有人向他发起了一个请求链接,那么现在inputs = [sk,conn1,conn2]  readable_list = [sk]
#本次循环完成之后再循环的时候 inputs = [sk,conn1,conn2,]  readable_list = [] 因为我们没有继续做操作
#第一个链接
inputs list : [<socket._socketobject object at 0x0000000002C56798>]  #默认只有一个对象
file descriptor : []
inputs list : [<socket._socketobject object at 0x0000000002C56798>]  
file descriptor : [<socket._socketobject object at 0x0000000002C56798>] #当捕获到,判断是否是新链接,如果是加入到inputs列表中监控
('127.0.0.1', 62539)
inputs list : [<socket._socketobject object at 0x0000000002C56798>, <socket._socketobject object at 0x0000000002C56800>]  #inputs列表变更为了[sk,conn1]
file descriptor : []  #因为没有后续的操作,这里没有捕获到异常所以列表为空
#第二个链接
inputs list : [<socket._socketobject object at 0x0000000002C56798>, <socket._socketobject object at 0x0000000002C56800>]  #第一个链接没有做任何操作
file descriptor : [<socket._socketobject object at 0x0000000002C56798>] #第二个链接过来了被捕获到,判断是否为新链接
('127.0.0.1', 62548)
inputs list : [<socket._socketobject object at 0x0000000002C56798>, <socket._socketobject object at 0x0000000002C56800>, <socket._socketobject object at 0x0000000002C56868>] #加入到inputs列表中
file descriptor : []
inputs list : [<socket._socketobject object at 0x0000000002C56798>, <socket._socketobject object at 0x0000000002C56800>, <socket._socketobject object at 0x0000000002C56868>]
file descriptor : []
inputs list : [<socket._socketobject object at 0x0000000002C56798>, <socket._socketobject object at 0x0000000002C56800>, <socket._socketobject object at 0x0000000002C56868>]
file descriptor : []

优化:当client端退出后,在inputs列表中移除对象!

select socket server - server release client-connect

#/usr/bin/env python
#-*- coding:utf-8 -*-
import time
import socket
import select
#创建socket对象
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.setsockopt
#设置监听的IP与端口
sk.bind(('127.0.0.1',6666))
#设置client最大等待连接数
sk.listen(5)
sk.setblocking(False) #这里设置setblocking为Falseaccept将不在阻塞,但是如果没有收到请求就会报错
inputs = [sk,] #将sk这个对象加入到列表中,并且赋值给inputs
#原因:看上例conn是客户端对象,客户是一直连接着呢,连接的时候状态变了,连接上之后,连接上之后,还是服务端的socket 有关吗?
#是不是的把他改为动态的?
while True:
    readable_list, writeable_list, error_list = select.select(inputs,[],[],1)  #把第一个参数设为列表动态的添加
    time.sleep(2) #测试使用
    print "inputs list :",inputs     #打印inputs列表,查看执行变化
    print "file descriptor :",readable_list #打印readable_list ,查看执行变化
    for r in readable_list:
        if r == sk:  #这里判断,如果是客户端连接过来的话他不是sk,如果是服务端的socket连接过来的话是sk
            conn,address = r.accept()
            inputs.append(conn)
            print address
        else:
        #如果是客户端,接受和返回数据
            client_data = r.recv(1024)
            if client_data:
                r.sendall(client_data)
            else:
                inputs.remove(r)#如果没有收到客户端端数据,则移除客户端句柄 因为,不管是正常关闭还是异常关闭,client端的系统底层都会发送一个消息

通过I/O多路复用让socket实现了处理多个客户端的方法,参数注解:

#第一个参数,监听的句柄序列,当有变动的时候就能捕获到把值赋值给readable_list
#如果第二参数有参数,即只要不是空列表,select就能感知,然后writeabled_list就能获取值
#第三个参数监听描述符,select内部,检测列表里面的描述符在底层操作的时候有没有异常,如果异常了他也当成一个变化,把这个赋值给error_list 一般第三个参数和第一个参数相同
#第四个参数,阻塞时间,如 1秒(这个如果不写,select会阻塞住,直到监听的描述符发生变化才继续往下执行)
readable_list, writeable_list, error_list = select.select(inputs,[],[],1)

对于I/O多路复用,咱们上面的例子就可以了,但是为了遵循select规范需要把读和写进行分离:

#rlist -- wait until ready for reading  #等待直到有读的操作
#wlist -- wait until ready for writing  #等待直到有写的操作
#xlist -- wait for an ``exceptional condition'' #等待一个错误的情况

读和写他共享接收的数据,仅仅靠变量是完成不了的,还的需要借助外界的字典,字典里为每一个客户度维护了一个队列。收到的信息都放到队列了,然后返回的时候直接从队列里拿就可以了

Queue 队列

队列的特点:

1、队列是先进先出,栈是相反的,后进先出

2、队列是线程安全的

import Queue

q = Queue.Queue() #调用队列生成对象

q.put(1)  #存放第一个值到队列

q.put(2)  #存放第二个值到队列

print 'get frist one:',q.get() #获取队列的第一个值

print 'get second on:',q.get() #获取队列的第二个值

先进先出原则第一次存放的是1,第二次存放的是2,那么我们在获取值得时候,第一次获取的就是1,第二次就是2

看下面的例子如果队列里没有值怎么办?他会等待直到有数据为止:

q = Queue.Queue() #调用队列生成对象

q.put(1)  #存放第一个值到队列

q.put(2)  #存放第二个值到队列

a = q.get() #获取队列的第一个值

print 'get frist one:%s' % a

b = q.get() #获取队列的第二个值

print 'get second one:%s' % b

c = q.get()#获取队列的第三个值

print 'get third one:%s' % c

#结果:

'''

get frist one:1

get second one:2

#这里一直在等待着值进来~

'''

如果不想让他等待,不管是否队列里都取数据,可以使用get_nowait,但是如果队列中没有数据就会报错!

q = Queue.Queue() #调用队列生成对象

q.put(1)  #存放第一个值到队列

q.put(2)  #存放第二个值到队列

a = q.get() #获取队列的第一个值

print 'get frist one:%s' % a

b = q.get() #获取队列的第二个值

print 'get second one:%s' % b

c = q.get_nowait()#获取队列的第三个值 ,使用:get_nowait()

print 'get third one:%s' % c

如果队列为空的时候可以通过异常处理进行捕获:

q = Queue.Queue() #调用队列生成对象try:
    q.get_nowait()except Queue.Empty as f:    print 'The Queue is empty!'

同样的如果队列长度为2,如果队列满了之后,同样他也是等待,直到有位置才会继续如下代码:

q = Queue.Queue(2) #调用队列生成对象

q.put(1)  #存放第一个值到队列

print 'put value 1 done'

q.put(2)  #存放第二个值到队列

print 'put vlaue 2 done'

q.put(3) #存放第三个值到队列

print 'put value 3 done'

#结果:

'''

put value 1 done

put vlaue 2 done

#这里会一直等待~

'''

同样如果存放数值的时候如果不想让他等待,使用put_nowait()但是队列无法存放后会报错!

q = Queue.Queue(2) #调用队列生成对象

q.put(1)  #存放第一个值到队列

print 'put value 1 done'

q.put(2)  #存放第二个值到队列

print 'put vlaue 2 done'

q.put_nowait(3) #存放第三个值到队列,如果使用put_nowait()队列无法存放后会报错!

print 'put value 3 done'

#结果:

'''

put value 1 done

put vlaue 2 done

#这里会一直等待~

利用select模拟伪Socket Server操作实例并把读/写进行分离

select socket server - server read | write separation

#!/usr/bin/env python
#-*- coding:utf-8 -*-
__author__ = 'luo_t'
import select
import socket
import Queue
import time
sk = socket.socket()
sk.bind(('127.0.0.1',6666))
sk.listen(5)
sk.setblocking(False) #定义非阻塞
inputs = [sk,]  #定义一个列表,select第一个参数监听句柄序列,当有变动是,捕获并把socket server加入到句柄序列中
outputs = [] #定义一个列表,select第二个参数监听句柄序列,当有值时就捕获,并加入到句柄序列
message = {}
#message的样板信息
#message = {
#    'c1':队列,[这里存放着用户C1发过来的消息]例如:[message1,message2]
#    'c2':队列,[这里存放着用户C2发过来的消息]例如:[message1,message2]
#}
while True:
    readable_list, writeable_list, error_list = select.select(inputs,outputs,[],1)
    #文件描述符可读 readable_list    只有第一个参数变化时候才捕获,并赋值给readable_list
    #文件描述符可写 writeable_list   只要有值,第二个参数就捕获并赋值给writeable_list
    #time.sleep(2)
    print 'inputs:',inputs
    print 'output:'
    print 'readable_list:',readable_list
    print 'writeable_list:',writeable_list
    print 'message',message
    for r in readable_list: #当readable_list有值得时候循环
        if r == sk:  #判断是否为链接请求变化的是否是socket server
            conn,addr = r.accept() #获取请求
            inputs.append(conn) #把客户端对象(句柄)加入到inputs里
            message[conn] = Queue.Queue() #并在字典里为这个客户端连接建立一个消息队列
        else:
            client_data = r.recv(1024) #如果请求的不是sk是客户端接收消息
            if client_data:#如果有数据
                outputs.append(r)#把用户加入到outpus里触发select第二个参数
                message[r].put(client_data)#在指定队列中插入数据
            else:
                inputs.remove(r)#没有数据,删除监听链接
                del message[r] #当数据为空的时候删除队列~~
    for w in writeable_list:#如果第二个参数有数据
        try:
            data = message[w].get_nowait()#去指定队列取数据 并且不阻塞
            w.sendall(data) #返回请求输入给client端
        except Queue.Empty:#反之触发异常
            pass
        outputs.remove(w) #因为第二个参数有值得时候就触发捕获值,所以使用完之后需要移除它
        #del message[r]
    print '%s' %('-' * 40)

使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。 但这个模型依旧有着很多问题。首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很 多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了queue,Solaris提供了/dev/poll,…。如果需要实现 更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具 有较好跨平台能力的服务器会比较困难。 其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。如下例,庞大的执行体1的将直接导致响应事件2的执行体迟迟得不到执行,并在很大程度上降低了事件探测的及时性。

wKiom1cQsY3g788xAABHoSiG814402.jpg
wKiom1cQsY3g788xAABHoSiG814402.jpg

五、异步I/O(asynchronous IO)

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都 完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

用异步IO实现的服务器这里就不举例了,以后有时间另开文章来讲述。异步IO是真正非阻塞的,它不会对请求进程产生任何的阻塞,因此对高并发的网络服务器实现至关重要。

到目前为止,已经将四个IO模型都介绍完了。现在回过头来看下面的问题:

1、blocking和non-blocking的区别在哪?

2、synchronous IO和asynchronous IO的区别在哪?

回答:

1、blocking和non-blocking的区别在哪?

blocking与non-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还在准备数据的情况下会立刻返回。

wKiom1cQsffQ8YONAABoCoKdT4o108.png
wKiom1cQsffQ8YONAABoCoKdT4o108.png
wKioL1cQsrGhfhNlAAEVAn5TKfk099.png
wKioL1cQsrGhfhNlAAEVAn5TKfk099.png

2、synchronous IO和asynchronous IO的区别在哪?

在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:

  • A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
  • An asynchronous I/O operation does not cause the requesting process to be blocked;

两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。有人可能会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个系统调用。

non-blocking IO在执行recvfrom这个系统调用的时候,如果kernel的数据没有准备好,这时候不会block进程。但是当kernel中数据准备好的时 候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内进程是被block的。

wKiom1cQshryiEKvAAEXCZSO4vs913.png
wKiom1cQshryiEKvAAEXCZSO4vs913.png

而asynchronous IO则不一样,当进程发起IO操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

六、I/O多路复用的应用场景

#(1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。

#(2)当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。

#(3)如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。

#(4)如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。

#(5)如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。

'''与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。'''

最后,再举几个不是很恰当的例子来说明这四个IO Model: 有A,B,C,D四个人在钓鱼: A用的是最老式的鱼竿,所以呢,得一直守着,等到鱼上钩了再拉杆;【阻塞】 B的鱼竿有个功能,能够显示是否有鱼上钩(这个显示功能一直去判断鱼是否上钩),所以呢,B就和旁边的MM聊天,隔会再看看有没有鱼上钩,有的话就迅速拉杆;【非阻塞】 C用的鱼竿和B差不多,但他想了一个好办法,就是同时放好几根鱼竿,然后守在旁边,一旦有显示说鱼上钩了,它就将对应的鱼竿拉起来;【同步】 D是个有钱人,干脆雇了一个人帮他钓鱼,一旦那个人把鱼钓上来了,就给D发个短信(消息回掉机制,主动告知)。【异步】

参考资料:

http://www.cnblogs.com/wupeiqi/articles/5040823.html

http://blog.chinaunix.net/uid-28458801-id-4464639.html

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档