专栏首页python3IO多路复用丶基于IO多路复用+sock

IO多路复用丶基于IO多路复用+sock

一丶IO多路复用

  IO多路复用指:通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作

  IO多路复用作用:

    检测多个socket是否已经发生变化(是否已经连接成功/是否已经获取数据)(可读/可写)

    操作系统检测socket是否发生变化有三种模式:

      select:最多1024个socket,循环去检测

      poll:不限制监听socket个数,循环去检测(水平触发)

      epoll:不限制监听socket个数:回调方式(边缘触发).

    Python模块:

      select.select

      select.epoll

  Python中有一个select模块,其中提供了:select丶poll丶epoll三个方法,分别调用系统的select,poll,epoll从而实现IO多路复用

  注意: 网络操作丶文件操作丶终端操作等均属于IO操作,对于windows只支持socket操作,其他系统支持其他IO操作,但是无法检测普通文件操作,自动上次读取是否已经变化

二丶基于IO多路复用+socket实现并发请求(一个线程100个请求)

  当我们需要向百度发送请求搜索三个关键字,我们改怎么办呢?

  单线程解决并发:

方式一:

key_list = ['alex','db','sb']
for item in key_list:
    ret = requests.get('https://www.baidu.com/s?wd=%s' %item)

View Code

方式二:

def get_data(key):
    # 方式二
    client = socket.socket()

    # 百度创建连接: 阻塞
    client.connect(('www.baidu.com',80))

    # 问百度我要什么?
    client.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')

    # 我等着接收百度给我的回复
    chunk_list = []
    while True:
        chunk = client.recv(8096)
        if not chunk:
            break
        chunk_list.append(chunk)

    body = b''.join(chunk_list)
    print(body.decode('utf-8'))

key_list = ['alex','db','sb']
for item in key_list:
    get_data(item)

View Code

  多线程解决并发:

import threading

key_list = ['alex','db','sb']
for item in key_list:
    t = threading.Thread(target=get_data,args=(item,))
    t.start()

View Code

  前面这几个程序在给发送连接请求时,必定会阻塞住,在哪儿等待百度给它回消息.我们可以把阻塞的地方变成非阻塞,这样可以一直给百度发送请求了,不要在哪儿傻傻的等待百度给回复了.

单线程的并发:

import socket
import select



client1 = socket.socket()
client1.setblocking(False) # 百度创建连接: 非阻塞

try:
    client1.connect(('www.baidu.com',80))
except BlockingIOError as e:
    pass


client2 = socket.socket()
client2.setblocking(False) # 百度创建连接: 非阻塞
try:
    client2.connect(('www.sogou.com',80))
except BlockingIOError as e:
    pass


client3 = socket.socket()
client3.setblocking(False) # 百度创建连接: 非阻塞
try:
    client3.connect(('www.oldboyedu.com',80))
except BlockingIOError as e:
    pass

socket_list = [client1,client2,client3]
conn_list = [client1,client2,client3]

while True:
    rlist,wlist,elist = select.select(socket_list,conn_list,[],0.005)
    # wlist中表示已经连接成功的socket对象
    for sk in wlist:
        if sk == client1:
            sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')
        elif sk==client2:
            sk.sendall(b'GET /web?query=fdf HTTP/1.0\r\nhost:www.sogou.com\r\n\r\n')
        else:
            sk.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.oldboyedu.com\r\n\r\n')
        conn_list.remove(sk)
    for sk in rlist:
        chunk_list = []
        while True:
            try:
                chunk = sk.recv(8096)
                if not chunk:
                    break
                chunk_list.append(chunk)
            except BlockingIOError as e:
                break
        body = b''.join(chunk_list)
        # print(body.decode('utf-8'))
        print('------------>',body)
        sk.close()
        socket_list.remove(sk)
    if not socket_list:
        break

View Code

单线程的并发高级版:

import socket
import select

class Req(object):
    def __init__(self,sk,func):
        self.sock = sk
        self.func = func

    def fileno(self):
        return self.sock.fileno()


class Nb(object):

    def __init__(self):
        self.conn_list = []
        self.socket_list = []

    def add(self,url,func):
        client = socket.socket()
        client.setblocking(False)  # 非阻塞
        try:
            client.connect((url, 80))
        except BlockingIOError as e:
            pass
        obj = Req(client,func)
        self.conn_list.append(obj)
        self.socket_list.append(obj)

    def run(self):

        while True:
            rlist,wlist,elist = select.select(self.socket_list,self.conn_list,[],0.005)
            # wlist中表示已经连接成功的req对象
            for sk in wlist:
                # 发生变换的req对象
                sk.sock.sendall(b'GET /s?wd=alex HTTP/1.0\r\nhost:www.baidu.com\r\n\r\n')
                self.conn_list.remove(sk)
            for sk in rlist:
                chunk_list = []
                while True:
                    try:
                        chunk = sk.sock.recv(8096)
                        if not chunk:
                            break
                        chunk_list.append(chunk)
                    except BlockingIOError as e:
                        break
                body = b''.join(chunk_list)
                # print(body.decode('utf-8'))
                sk.func(body)
                sk.sock.close()
                self.socket_list.remove(sk)
            if not self.socket_list:
                break


def baidu_repsonse(body):
    print('百度下载结果:',body)

def sogou_repsonse(body):
    print('搜狗下载结果:', body)

def oldboyedu_repsonse(body):
    print('老男孩下载结果:', body)


t1 = Nb()
t1.add('www.baidu.com',baidu_repsonse)
t1.add('www.sogou.com',sogou_repsonse)
t1.add('www.oldboyedu.com',oldboyedu_repsonse)
t1.run()

View Code

  什么是异步非阻塞?

    非阻塞,不等待

      比如创建socket对某个地址进行connect丶获取接收数据recv时默认都会等待(连接成功或接收到数据),才执行后续操作,如果设置setblocking(False),以上两个过程就不再等待,但是会报BlockingIOError的错误,只要捕获即可

    异步,通知,执行完成之后自动执行回调函数或自动执行某些操作(通知).

      比如做爬虫中向某个地址baidu.com发送请求,当请求执行完成之后自执行回调函

三丶协程

  协程也可以称为"微线程",就是开发者控制线程执行流程,控制先执行某段代码然后再切换到另外函数执行代码,来回切换

  需要强调的是:

    1.Python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到IO或执行时间过长就会被迫交出CPU权限,切换其他线程运行)

    2.单线程内开启进程,一旦遇到IO,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(非IO操作的切换与效率无关)

  对比操作系统控制线程的切换,用户在单线程内控制协程的切换

  优点如下:

    1.协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级

    2.单线程内就可以实现并发的效果,最大限度地利用CPU

  缺点如下:

    1.协程的本质是单线程下,无法利用多核,可以使一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程

    2.协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

  总结:

    1.必须在只有一个单线程里实现并发(协程本身无法实现并发)

    2.修改共享数据不需加锁

    3.用户程序里自己保存多个控制流的上下文栈

    4.附加一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield,greenlet都无法实现,就用到了gevent模块(select机制))

Greenlet模块

  安装:pip3 install greenlet

  greenlet实现了状态的切换:

import greenlet


def f1():
    print(11)
    gr2.switch()
    print(22)
    gr2.switch()


def f2():
    print(33)
    gr1.switch()
    print(44)


# 协程 gr1
gr1 = greenlet.greenlet(f1)
# 协程 gr2
gr2 = greenlet.greenlet(f2)

gr1.switch()

View Code

Gevent模块:

  安装:pip3 install gevent

  Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

from gevent import monkey
monkey.patch_all() # 以后代码中遇到IO都会自动执行greenlet的switch进行切换
import requests
import gevent


def get_page1(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page2(url):
    ret = requests.get(url)
    print(url,ret.content)

def get_page3(url):
    ret = requests.get(url)
    print(url,ret.content)

gevent.joinall([
    gevent.spawn(get_page1, 'https://www.python.org/'), # 协程1
    gevent.spawn(get_page2, 'https://www.yahoo.com/'),  # 协程2
    gevent.spawn(get_page3, 'https://github.com/'),     # 协程3
])

View Code

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • windows中Python串口编程(一

    在windows中,使用python进行串口编程需要安装一个Serial模块 pyserial: 下载地址:https://pypi.python.org/...

    py3study
  • Windows平台python操作串口示

    在windows中,使用Python进行串口编程需要安装一个Serial模块pyserial:

    py3study
  • python3.2列表操作总结

    list操作:快速创建list、新增item、删除item、重新赋值item、颠倒item顺序、检索item

    py3study
  • gevent实现静态web服务器(协程实现)小结

    写在前面 为提高web服务器的服务质量,一般通过多线程/多进程实现多任务来服务大量用户,但线程和进程往往要消耗较多的系统资源,而且如果线程/进程数达到一个较大...

    zhaoolee
  • 基于Kafka构建事件溯源模式的微服务

    微服务本身并不算什么新概念,它要解决的问题在软件工程历史中早已经有人提出:解耦、扩展性、灵活性,解决“烂架构”膨胀后带来的复杂度问题。

    RiboseYim
  • 巴菲特评科技股:投资 IBM 是个错误,还会增持苹果,亚马逊简直是奇迹

    北京时间 5 月 6 日,跨国投资及控股集团公司伯克希尔·哈撒韦(Berkshire Hathaway)在美国奥马哈总部召开了一年一次的全球股东大会,创始人股神...

    AI科技大本营
  • 浏览器输入URL回车之后发生了什么?(超详细版)

    这个问题已经是老生常谈了,更是经常被作为面试的压轴题出现,网上也有很多文章,但最近闲的无聊,然后就自己做了一篇笔记,感觉比之前理解更透彻了。

    前端迷
  • python3-socketserver

    这个框架包括了BaseHTTPServer , SimpleHTTPServer , CGIHTTPServer , SimpleXMLRPCServer , ...

    py3study
  • 开源一个博客小程序

    之前就一直想做一个博客小程序方便在手机上查阅,遇到了一些问题迟迟没有推进。前一段时间才把丑陋的初版做出来ಥ_ಥ。

    薛定喵君
  • ThunderBoard与EFR32MG初印象-Silabs厦门workshop笔记

    今天参加了Silabs和世强在厦门的workshop,对Silabs的EFR32MG方案印象深刻,值得一提的是,Silabs原厂的FAE梅汉忠特别有型,顶着一头...

    twowinter

扫码关注云+社区

领取腾讯云代金券