前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python 协程

Python 协程

原创
作者头像
ruochen
修改2021-05-11 14:39:05
1.5K0
修改2021-05-11 14:39:05
举报

协程

迭代器

  • 可迭代(Iterable):直接作用于for循环的变量
  • 迭代器(Iterator):不但可以作用于for循环,还可以被next调用
  • list是典型的可迭代对象,但不是迭代器
  • 通过isinstance判断
  • iterable 和 iterator可以转换
    • 通过iter函数
代码语言:txt
复制
# 可迭代
l = [i for i in range(10)]
# l是可迭代的,但不是迭代器
for idx in l:
    print(idx)
    
# range是个迭代器
for i in range(5):
    print(i)
代码语言:txt
复制
0
代码语言:txt
复制
1
代码语言:txt
复制
2
代码语言:txt
复制
3
代码语言:txt
复制
4
代码语言:txt
复制
5
代码语言:txt
复制
6
代码语言:txt
复制
7
代码语言:txt
复制
8
代码语言:txt
复制
9
代码语言:txt
复制
0
代码语言:txt
复制
1
代码语言:txt
复制
2
代码语言:txt
复制
3
代码语言:txt
复制
4
代码语言:txt
复制
# isinstance案例
# 判断某个变量是否是一个实例

# 判断是否可迭代
from collections import Iterable
ll = [1, 2, 3, 4, 5]

print(isinstance(ll, Iterable))

from collections import Iterator
print(isinstance(ll, Iterator))
代码语言:txt
复制
True
代码语言:txt
复制
False
代码语言:txt
复制
# iter函数

s = 'i love you'

print(isinstance(s, Iterable))
print(isinstance(s, Iterator))

s_iter = iter(s)
print(isinstance(s_iter, Iterable))
print(isinstance(s_iter, Iterator))
代码语言:txt
复制
True
代码语言:txt
复制
False
代码语言:txt
复制
True
代码语言:txt
复制
True

生成器

  • generator:一边循环一边计算下一个元素的机制/算法
  • 需要满足三个条件
    • 每次调用都生产出for循环需要的下一个元素或者
    • 如果达到最后一个后,爆出StopIteration异常
    • 可以被next函数调用
  • 如何生成一个生成器
    • 直接使用
    • 如果函数中包含yield,则这个函数就叫生成器
    • next调用函数,遇到yield返回
代码语言:txt
复制
# 直接使用生成器

L = [x*x for x in range(5)] # 放在中括号中是列表生成器
g = (x*x for x in range(5)) # 放在小括号中就是生成器

print(type(L))
print(type(g))
代码语言:txt
复制
<class 'list'>
代码语言:txt
复制
<class 'generator'>
代码语言:txt
复制
# 函数案例

def odd():
    print("Step 1")
    print("Step 2")
    print("Step 3")
    return None

odd()
代码语言:txt
复制
Step 1
代码语言:txt
复制
Step 2
代码语言:txt
复制
Step 3
代码语言:txt
复制
# 生成器的案例
# 在函数odd中, yield负责返回
def odd():
    print("Step 1")
    yield 1
    print("Step 2")
    yield 2
    print("Step 3")
    yield 3

# odd() 是调用生成器
g = odd()

one = next(g)
print(one)

two = next(g)
print(two)


three = next(g)
print(three)
代码语言:txt
复制
Step 1
代码语言:txt
复制
1
代码语言:txt
复制
Step 2
代码语言:txt
复制
2
代码语言:txt
复制
Step 3
代码语言:txt
复制
3
代码语言:txt
复制
# for循环调用生成器
# 斐波那契数列
def fib(max):
    n, a, b = 0, 0, 1 # 注意写法
    while n < max:
        print(b)
        a, b = b, a+b # 注意写法
        n += 1
        
    return 'Done'

fib(5)
代码语言:txt
复制
1
代码语言:txt
复制
1
代码语言:txt
复制
2
代码语言:txt
复制
3
代码语言:txt
复制
5
代码语言:txt
复制
'Done'
代码语言:txt
复制
# 斐波那契数列的生成器写法
def fib(max):
    n, a, b = 0, 0, 1 # 注意写法
    while n < max:
        yield b
        a, b = b, a+b # 注意写法
        n += 1
       
    # 需要注意,爆出异常时的返回值是return的返回值
    return 'Done'

g = fib(5)

for i in range(6):
    rst = next(g)
    print(rst)
代码语言:txt
复制
1
代码语言:txt
复制
1
代码语言:txt
复制
2
代码语言:txt
复制
3
代码语言:txt
复制
5
代码语言:txt
复制
---------------------------------------------------------------------------
代码语言:txt
复制
StopIteration                             Traceback (most recent call last)
代码语言:txt
复制
<ipython-input-34-71c1fd16a38d> in <module>
代码语言:txt
复制
     13 
代码语言:txt
复制
     14 for i in range(6):
代码语言:txt
复制
---> 15     rst = next(g)
代码语言:txt
复制
     16     print(rst)
代码语言:txt
复制
StopIteration: Done
代码语言:txt
复制
ge = fib(10)
'''
生成器的典型用法是在for中使用
比较常用的典型生成器就是range
'''
for i in ge: # 在for循环中使用生成器
    print(i)
代码语言:txt
复制
1
代码语言:txt
复制
1
代码语言:txt
复制
2
代码语言:txt
复制
3
代码语言:txt
复制
5
代码语言:txt
复制
8
代码语言:txt
复制
13
代码语言:txt
复制
21
代码语言:txt
复制
34
代码语言:txt
复制
55

协程

  • 历史历程
    • 3.4引入协程,用yield实现
    • 3.5引入协程语法
    • 实现的协程比较好的包有asyncio,tornado,gevent
  • 定义:协程 是为非抢占式多任务产生子程序的计算机程序组件,协程允许不同入口点在不用位置暂停或者执行程序。
  • 从技术角度讲,协程就是一个你可以暂停执行的函数,或者干脆把协程理解成生成器
  • 协程的实现:
    • yield返回
    • send调用
  • 协程的四个状态
    • inspect.getgeneratorstate(...) 函数确定,该函数会返回下述字符串中的一个:
    • GEN_CREATED:等待开始执行
    • GEN_RUNNING:解释器正在执行
    • GEN_SUSPENED:在yield表达式处暂停
    • GEN_CLOSED:执行结束
    • next预激(prime)
    • 代码案例v2
  • 协程终止
    • 协程中未处理的异常会向上冒泡,传给 next 函数或 send 方法的调用方(即触发协程的对象)
    • 终止协程的一种方式:发送某个哨符值,让协程退出。内置的 None 和 Ellipsis 等常量经常用作哨符值==。
  • yield from
    • 调用协程为了得到返回值,协程必须正常终止
    • 生成器正常终止会发出StopIteration异常,异常对象的vlaue属性保存返回值
    • yield from从内部捕获StopIterator异常
    • 案例v03
    • 委派生成器
      • 包含yield from表达式的生成器函数
      • 委派生成器在yield from表达式处暂停,调用方可以直接把数据发给子生成器
      • 子生成器再把产出的值发给调用方
      • 子生成器在最后,解释器会抛出StopIteration,并且把返回值附加到异常对象上
      • 案例v04
代码语言:txt
复制
# 协程代码案例1

def simple_coroutine():
    print('-> start')
    x = yield
    print('-> recived', x)
    
# 主线程
sc = simple_coroutine()
print(1111)
# 可以使用sc.send(None), 效果一样
next(sc) # 预激

print(2222)
sc.send('zhuxiao')
代码语言:txt
复制
1111
代码语言:txt
复制
-> start
代码语言:txt
复制
2222
代码语言:txt
复制
-> recived zhuxiao
代码语言:txt
复制
---------------------------------------------------------------------------
代码语言:txt
复制
StopIteration                             Traceback (most recent call last)
代码语言:txt
复制
<ipython-input-38-daec0bf83892> in <module>
代码语言:txt
复制
     13 
代码语言:txt
复制
     14 print(2222)
代码语言:txt
复制
---> 15 sc.send('zhuxiao')
代码语言:txt
复制
StopIteration: 
代码语言:txt
复制
# 案例v2,协程的状态

def simple_coroutine(a):
    print('-> start')
    
    b = yield a
    print('-> recived', a, b)
    
    c = yield a + b
    print('-> recived', a, b, c)
    
# run
sc = simple_coroutine(5)

aa = next(sc)
print(aa)
bb = sc.send(6) # 5, 6
print(bb)
cc = sc.send(7) # 5, 6, 7
print(cc)
代码语言:txt
复制
-> start
代码语言:txt
复制
5
代码语言:txt
复制
-> recived 5 6
代码语言:txt
复制
11
代码语言:txt
复制
-> recived 5 6 7
代码语言:txt
复制
---------------------------------------------------------------------------
代码语言:txt
复制
StopIteration                             Traceback (most recent call last)
代码语言:txt
复制
<ipython-input-40-1f54596be296> in <module>
代码语言:txt
复制
     17 bb = sc.send(6) # 5, 6
代码语言:txt
复制
     18 print(bb)
代码语言:txt
复制
---> 19 cc = sc.send(7) # 5, 6, 7
代码语言:txt
复制
     20 print(cc)
代码语言:txt
复制
StopIteration: 
代码语言:txt
复制
# 案例v03

def gen():
    for c in 'AB':
        yield c
        
# list直接用生成器作为参数
print(list(gen()))

def gen_new():
    yield from 'AB'
    
print(list(gen_new()))
代码语言:txt
复制
['A', 'B']
代码语言:txt
复制
['A', 'B']
代码语言:txt
复制
# 案例v04,委派生成器

from collections import namedtuple

'''
解释:
1. 外层 for 循环每次迭代会新建一个 grouper 实例,赋值给 coroutine 变量;grouper 是委派生成器。
2. 调用 next(coroutine), 预激委派生成器 grouper,此时进入 while True 循环,调用子生成器 averager 后,在 yield from 表达式处暂停。
3. 内层 for 循环调用 coroutine.send(value), 直接把值传给子生成器 averager。同时,当前的 grouper 实例(coroutine)在 yield from 表达式处暂停。
4. 内层循环结束后,grouper 实例依旧在 yield form 表达式处暂停。因此,grouper 函数定义体中为 results[key] 赋值的语句还没有执行。
5. coroutine.send(None) 终止 averager 子生成器,子生成器抛出 StopIteration 异常并将返回的数据包含在异常对象的 value 中,yield from 可以直接抓取 StopIteration 异常并将异对象的 value 赋值给 results[key].
'''
ResClass = namedtuple('Res', 'count average')

# 子生成器
def averager(): 
    total = 0.0
    count = 0
    average = None
    
    while True:
        term = yield
        # None是哨兵值
        if term is None:
            break
        total += term
        count += 1
        average = total / count
    return ResClass(count, average)

# 委派生成器
def grouper(storages, key):
    while True:
        # 获取average()返回的值
        storages[key] = yield from averager()
        
# 客户端代码
def client():
    process_data = {
        'boys_2': [39.0, 40.8, 43.5, 43.6, 38.5, 49.5, 34.5, 43.5, 34.7],
        'boys_1': [1.78, 1.79, 1.43, 1.23, 1.34, 1.54, 1.43, 1.53, 1.89]
    }
    
    storages = {}
    for k, v in process_data.items():
        # 获得协程
        coroutine = grouper(storages, k)
        
        # 预激协程
        next(coroutine)
        
        # 发送数据到协程
        for dt in v:
            coroutine.send(dt)
            
        # 终止协程
        coroutine.send(None)
    print(storages)
    
# run
client()
代码语言:txt
复制
{'boys_2': Res(count=9, average=40.84444444444444), 'boys_1': Res(count=9, average=1.551111111111111)}

asyncio

  • python3.4开始引入标准库当中,内置对异步io的支持
  • asyncio本身是一个消息循环
  • 步骤:
    • 创建消息循环
    • 把协程导入
    • 关闭
代码语言:txt
复制
import threading
# 引入异步io包
import asyncio

# 使用协程
@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    print('Start.... (%s)' % threading.currentThread())
    yield from asyncio.sleep(5)
    print('Done.... (%s)' % threading.currentThread())
    print('Hello again! (%s)' % threading.currentThread())
    
# 启动消息循环
loop = asyncio.get_event_loop()
# 定义任务
tasks = [hello(), hello()]
# asyncio使用wait等待task执行完毕
loop.run_until_complete(asyncio.wait(tasks))
# 关闭消息循环
loop.close()
    
代码语言:txt
复制
---------------------------------------------------------------------------
代码语言:txt
复制
RuntimeError                              Traceback (most recent call last)
代码语言:txt
复制
<ipython-input-8-377939537d5d> in <module>
代码语言:txt
复制
     17 tasks = [hello(), hello()]
代码语言:txt
复制
     18 # asyncio使用wait等待task执行完毕
代码语言:txt
复制
---> 19 loop.run_until_complete(asyncio.wait(tasks))
代码语言:txt
复制
     20 # 关闭消息循环
代码语言:txt
复制
     21 loop.close()
代码语言:txt
复制
D:\Anaconda3\lib\asyncio\base_events.py in run_until_complete(self, future)
代码语言:txt
复制
    569         future.add_done_callback(_run_until_complete_cb)
代码语言:txt
复制
    570         try:
代码语言:txt
复制
--> 571             self.run_forever()
代码语言:txt
复制
    572         except:
代码语言:txt
复制
    573             if new_task and future.done() and not future.cancelled():
代码语言:txt
复制
D:\Anaconda3\lib\asyncio\base_events.py in run_forever(self)
代码语言:txt
复制
    524         self._check_closed()
代码语言:txt
复制
    525         if self.is_running():
代码语言:txt
复制
--> 526             raise RuntimeError('This event loop is already running')
代码语言:txt
复制
    527         if events._get_running_loop() is not None:
代码语言:txt
复制
    528             raise RuntimeError(
代码语言:txt
复制
RuntimeError: This event loop is already running
代码语言:txt
复制
Hello world! (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Start.... (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Hello world! (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Start.... (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Done.... (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Hello again! (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Done.... (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Hello again! (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
import asyncio

@asyncio.coroutine
def wget(host):
    print('wget %s...' % host)
    # 异步请求网络地址
    connect = asyncio.open_connection(host, 80)
    # 注意yield from的用法
    reader, writer = yield from connect
    header = 'Get / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    write.write(header.encode('utf-8'))
    yield from write.drain()
    while True:
        line = yield from reader.readline()
        # http协议的换行使用\r\n
        if line == b'\r\n':
            break
        print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    write.close()
    
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

asyn and await

  • 为了更好的表示异步io
  • python3.5引入
  • 让协程代码更简洁
  • 使用上,可以简单的进行替换
    • 用 async 替换 @asyncio.coroutine
    • 用 await 替换 yield from
代码语言:txt
复制
import threading
# 引入异步io包
import asyncio

# 使用协程
# @asyncio.coroutine
async def hello():
    print('Hello world! (%s)' % threading.currentThread())
    print('Start.... (%s)' % threading.currentThread())
    await asyncio.sleep(5)
    print('Done.... (%s)' % threading.currentThread())
    print('Hello again! (%s)' % threading.currentThread())
    
# 启动消息循环
loop = asyncio.get_event_loop()
# 定义任务
tasks = [hello(), hello()]
# asyncio使用wait等待task执行完毕
loop.run_until_complete(asyncio.wait(tasks))
# 关闭消息循环
loop.close()
    
代码语言:txt
复制
---------------------------------------------------------------------------
代码语言:txt
复制
RuntimeError                              Traceback (most recent call last)
代码语言:txt
复制
<ipython-input-10-ceee64b7a419> in <module>
代码语言:txt
复制
     17 tasks = [hello(), hello()]
代码语言:txt
复制
     18 # asyncio使用wait等待task执行完毕
代码语言:txt
复制
---> 19 loop.run_until_complete(asyncio.wait(tasks))
代码语言:txt
复制
     20 # 关闭消息循环
代码语言:txt
复制
     21 loop.close()
代码语言:txt
复制
D:\Anaconda3\lib\asyncio\base_events.py in run_until_complete(self, future)
代码语言:txt
复制
    569         future.add_done_callback(_run_until_complete_cb)
代码语言:txt
复制
    570         try:
代码语言:txt
复制
--> 571             self.run_forever()
代码语言:txt
复制
    572         except:
代码语言:txt
复制
    573             if new_task and future.done() and not future.cancelled():
代码语言:txt
复制
D:\Anaconda3\lib\asyncio\base_events.py in run_forever(self)
代码语言:txt
复制
    524         self._check_closed()
代码语言:txt
复制
    525         if self.is_running():
代码语言:txt
复制
--> 526             raise RuntimeError('This event loop is already running')
代码语言:txt
复制
    527         if events._get_running_loop() is not None:
代码语言:txt
复制
    528             raise RuntimeError(
代码语言:txt
复制
RuntimeError: This event loop is already running
代码语言:txt
复制
Hello world! (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Start.... (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Hello world! (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Start.... (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Done.... (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Hello again! (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Done.... (<_MainThread(MainThread, started 22472)>)
代码语言:txt
复制
Hello again! (<_MainThread(MainThread, started 22472)>)

aiohttp

  • asyncio实现单线程的并发io,在客户端用处不大
  • 在服务器端可以asyncio+coroutine配合,因为http是io操作
  • asyncio实现了tcp,udp,ssl等协议
  • aiohttp是给予asyncio实现的http框架
  • pip install aiohttp安装
代码语言:txt
复制
# aiohttp案例

import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body=b'<h1>Index</h1>')

async def hello(request):
    await asyncio.sleep(0.5)
    text = '<h1>hello, %s!</h1>' % request.match_info['name']
    return web.Response(body=text.encode('utf-8'))

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/hello/{name}', hello)
    srv = await loop.create_serve(app.make_handler(), '127.0.0.1', 8000)
    print('Serve started at http://127.0.0.1:8000...')
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()
代码语言:txt
复制
---------------------------------------------------------------------------
代码语言:txt
复制
ModuleNotFoundError                       Traceback (most recent call last)
代码语言:txt
复制
<ipython-input-4-e9c02c1f95af> in <module>
代码语言:txt
复制
      3 import asyncio
代码语言:txt
复制
      4 
代码语言:txt
复制
----> 5 from aiohttp import web
代码语言:txt
复制
      6 
代码语言:txt
复制
      7 async def index(request):
代码语言:txt
复制
ModuleNotFoundError: No module named 'aiohttp'

concurrent.futures

  • python3新增的库
  • 类似其他语言的线程池的概念
  • 利用multiprocessiong实现真正的并行计算
  • 核心原理:以子进程的形式,并行运行多个python解释器,从而令python程序可以利用多核CPU来提升运行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核。
  • concurrent.futures.Executor
    • ThreadPoolExecutor
    • ProcessPoolExecutor
    • 执行的时候需要自行选择
  • sunmit(fn, args, kwargs)
    • fn: 异步执行的函数
    • args, kwargs参数
代码语言:txt
复制
# 关于concurrent的案例
from concurrent.futures import ThreadPoolExecutor
import time

def return_future(msg):
    time.sleep(3)
    return msg

# 创建一个线程池
pool = ThreadPoolExecutor(max_workers=2)

# 往线程池加入2个task
f1 = pool.submit(return_future, 'hello')
f2 = pool.submit(return_future, 'world')

# 等待执行完毕
print(f1.done())
time.sleep(3)
print(f2.done())

# 结果
print(f1.result())
print(f2.result())
代码语言:txt
复制
False
代码语言:txt
复制
True
代码语言:txt
复制
hello
代码语言:txt
复制
world

current中map函数

  • map(fn, *iterables, timeout=None)
    • 跟map函数类似
    • 函数需要异步执行
    • timeout:超时时间
    • map跟submit使用一个就行
代码语言:txt
复制
# map案例

import time,re
import os,datetime
from concurrent import futures

data = ['1', '2']

def wait_on(argument):
    print(argument)
    time.sleep(2)
    return "OK"

ex = futures.ThreadPoolExecutor(max_workers=2)
for i in ex.map(wait_on, data):
    print(i)
代码语言:txt
复制
1
代码语言:txt
复制
2
代码语言:txt
复制
OK
代码语言:txt
复制
OK

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 协程
  • 迭代器
  • 生成器
  • 协程
  • asyncio
  • asyn and await
  • aiohttp
  • concurrent.futures
  • current中map函数
相关产品与服务
GPU 云服务器
GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档