
咱们先聊个实际问题:你有没有写过这样的代码 —— 循环调用 10 个 API,每个 API 要等 2 秒,同步跑下来要 20 秒?要是能让这些请求 “同时跑”,2 秒就搞定多好?这就是异步编程要解决的事儿!
今天咱们就用 Python 自带的asyncio库,从基础到实战,把异步编程彻底搞明白。所有代码都能直接复制运行,遇到复杂点我会用比喻讲,保证不绕弯子。
咱们先对比两个生活场景,理解 “同步” 和 “异步” 的区别:
场景 | 同步做法 | 异步做法 |
|---|---|---|
去餐厅吃饭 | 点完菜就坐着等,菜上了才动筷子 | 点完菜先玩会儿手机,服务员喊 “菜好了” 再去吃 |
调用 API | 发一个请求,等结果回来再发下一个 | 发完请求先去处理别的,结果回来再接着处理 |
只适合IO 密集型任务(比如调用 API、读文件、数据库操作)—— 这些任务的大部分时间都在 “等”(等网络响应、等磁盘读写)。
如果是CPU 密集型任务(比如算圆周率、图像处理),异步没用!因为 CPU 一直在干活,没空想别的,这时候得用多进程。
异步编程的核心是 “协程”(可以理解成 “能暂停的函数”),写协程要用到两个关键字:async和await。
协程函数和普通函数的区别,就是多了个async关键字:
# 这是一个协程函数
async def say_hello():
print("Hello, 异步!")
# 模拟“等待”(比如等API响应),用asyncio.sleep,不能用time.sleep!
await asyncio.sleep(1) # 关键:await会让协程“暂停”,去干别的
print("Hello, 又回来了!")⚠️ 注意:普通的time.sleep(1)会阻塞整个程序,而asyncio.sleep(1)是 “异步等待”,会让协程暂停,同时事件循环能去执行其他协程。
协程函数调用后不会直接执行,会返回一个 “协程对象”,必须交给asyncio的 “事件循环” 才能跑起来。
最简单的运行方式是用asyncio.run()(Python 3.7 + 支持):
import asyncio
async def say_hello():
print("Hello, 异步!")
await asyncio.sleep(1)
print("Hello, 又回来了!")
# 运行协程(asyncio.run会自动创建和管理事件循环)
asyncio.run(say_hello())运行结果:
Hello, 异步!
(等待1秒)
Hello, 又回来了!await的作用是 “暂停当前协程,等待另一个可等待对象(比如协程、任务)完成”。
你可以理解成:“我现在要等个结果,这段时间别管我,先去处理别人的事儿,等结果好了再叫我”。
比如两个协程配合:
import asyncio
async def wash_dishes():
print("开始洗碗,要2秒")
await asyncio.sleep(2) # 洗碗时暂停,去干别的
print("碗洗完了")
async def cook_rice():
print("开始煮饭,要3秒")
await asyncio.sleep(3) # 煮饭时暂停,去干别的
print("饭煮好了")
async def main():
# 同时执行两个协程(不是排队等)
await asyncio.gather(wash_dishes(), cook_rice()) # gather是“收集”多个协程
asyncio.run(main())运行结果(总耗时 3 秒,不是 2+3=5 秒):
开始洗碗,要2秒
开始煮饭,要3秒
(等待2秒后)碗洗完了
(再等1秒后)饭煮好了刚才咱们用了asyncio.run()和gather(),背后其实是两个核心东西:事件循环和任务。
事件循环就像餐厅的 “调度员”,负责:
await时,把它挂起,去执行其他能跑的协程await的任务完成后,再把挂起的协程拉回来继续跑你不用手动创建事件循环(asyncio.run()会帮你做),但得知道它的存在 —— 异步所有逻辑都靠它驱动。
很多人会把 “协程对象” 和 “任务对象” 搞混,咱们用表格分清:
类型 | 定义 | 创建方式 | 特点 | 用途 |
|---|---|---|---|---|
协程对象 | async def 函数调用后得到的对象 |
| 不能直接运行,必须靠 await 或任务驱动 | 封装单个异步逻辑 |
任务对象 | 把协程包装成 “可调度的任务” |
| 会被事件循环自动调度,能跟踪状态(是否完成、结果是什么) | 管理异步任务(取消、查状态) |
简单说:任务是 “可管理的协程”,能主动取消、查结果,而协程不行。
比如创建任务并查状态:
import asyncio
async def my_task():
await asyncio.sleep(2)
return "任务完成!"
async def main():
# 1. 创建协程对象
coro = my_task()
print("协程对象类型:", type(coro)) # <class 'coroutine'>
# 2. 把协程包装成任务(会自动加入事件循环)
task = asyncio.create_task(coro)
print("任务对象类型:", type(task)) # <class '_asyncio.Task'>
# 3. 查任务状态
print("任务是否完成?", task.done()) # False(刚创建,还在跑)
# 4. 等待任务完成,获取结果
result = await task
print("任务结果:", result) # 任务完成!
print("任务是否完成?", task.done()) # True(已经跑完)
asyncio.run(main())咱们用最常见的 “调用 API” 场景,对比同步和异步的效率差异。
需要先装个处理异步 HTTP 请求的库:pip install aiohttp(普通的 requests 库是同步的,不能用在异步里)。
假设要调用 3 个 API,每个要 2 秒:
import requests
import time
def fetch_api_sync(url):
"""同步调用API"""
print(f"开始请求:{url}")
response = requests.get(url) # 同步等待,会阻塞
print(f"完成请求:{url},状态码:{response.status_code}")
return response.status_code
def main_sync():
urls = [
"https://httpbin.org/get?name=1",
"https://httpbin.org/get?name=2",
"https://httpbin.org/get?name=3"
]
start_time = time.time()
# 同步执行:一个完了再跑下一个
for url in urls:
fetch_api_sync(url)
end_time = time.time()
print(f"同步总耗时:{end_time - start_time:.2f}秒")
main_sync()运行结果(总耗时≈6 秒):
开始请求:https://httpbin.org/get?name=1
完成请求:https://httpbin.org/get?name=1,状态码:200
开始请求:https://httpbin.org/get?name=2
完成请求:https://httpbin.org/get?name=2,状态码:200
开始请求:https://httpbin.org/get?name=3
完成请求:https://httpbin.org/get?name=3,状态码:200
同步总耗时:6.12秒用aiohttp和asyncio改写,让 3 个请求同时跑:
import aiohttp
import asyncio
import time
async def fetch_api_async(url, session):
"""异步调用API"""
print(f"开始请求:{url}")
# 异步请求:await会暂停,去跑其他请求
async with session.get(url) as response:
status = response.status
print(f"完成请求:{url},状态码:{status}")
return status
async def main_async():
urls = [
"https://httpbin.org/get?name=1",
"https://httpbin.org/get?name=2",
"https://httpbin.org/get?name=3"
]
# 创建aiohttp的会话(复用连接,效率高)
async with aiohttp.ClientSession() as session:
# 1. 把每个API请求包装成任务
tasks = [asyncio.create_task(fetch_api_async(url, session)) for url in urls]
# 2. 等待所有任务完成,收集结果
results = await asyncio.gather(*tasks) # *tasks是解包列表
print(f"所有请求结果:{results}")
start_time = time.time()
await main_async()
end_time = time.time()
print(f"异步总耗时:{end_time - start_time:.2f}秒")
asyncio.run(main_async())运行结果(总耗时≈2 秒):
开始请求:https://httpbin.org/get?name=1
开始请求:https://httpbin.org/get?name=2
开始请求:https://httpbin.org/get?name=3
完成请求:https://httpbin.org/get?name=2,状态码:200
完成请求:https://httpbin.org/get?name=1,状态码:200
完成请求:https://httpbin.org/get?name=3,状态码:200
所有请求结果:[200, 200, 200]
异步总耗时:2.05秒看到没?同样的任务,异步比同步快 3 倍!这就是异步的魅力。
咱们已经会跑多任务了,但实际开发中还需要管理任务:比如取消卡住的任务、处理超时、查任务状态。
如果一个任务跑太久(比如 API 卡住了),可以主动取消它。取消会抛出CancelledError,需要捕获。
import asyncio
async def stuck_task():
"""模拟一个卡住的任务(比如API一直不响应)"""
try:
print("任务开始,要跑10秒...")
await asyncio.sleep(10) # 模拟长时间运行
print("任务正常完成")
except asyncio.CancelledError:
# 捕获取消异常,做清理工作(比如关闭连接)
print("任务被取消了,做下清理")
raise # 可选:如果想让外层知道取消,就抛出
async def main():
# 创建任务
task = asyncio.create_task(stuck_task())
# 等3秒后取消任务
await asyncio.sleep(3)
print("准备取消任务")
task.cancel() # 取消任务
# 必须await被取消的任务,否则会有警告
try:
await task
except asyncio.CancelledError:
print("外层捕获到任务被取消")
asyncio.run(main())运行结果:
任务开始,要跑10秒...
(等待3秒)
准备取消任务
任务被取消了,做下清理
外层捕获到任务被取消如果想给任务设置 “超时时间”(比如 API 请求超过 5 秒就放弃),用asyncio.wait_for(),超时会抛出TimeoutError。
import asyncio
async def fetch_api(url):
"""模拟API请求,耗时3秒"""
await asyncio.sleep(3)
return f"{url} 响应成功"
async def main():
url = "https://api.example.com"
try:
# 设置超时2秒(比请求耗时短)
result = await asyncio.wait_for(fetch_api(url), timeout=2)
print(result)
except asyncio.TimeoutError:
print(f"请求{url}超时了!")
asyncio.run(main())运行结果:
请求https://api.example.com超时了!task.done():判断任务是否完成(返回 True/False)task.result():获取任务的返回结果(只能在任务完成后调用,否则报错)import asyncio
async def my_task():
await asyncio.sleep(2)
return "最终结果"
async def main():
task = asyncio.create_task(my_task())
# 任务没完成时查状态
print("任务是否完成?", task.done()) # False
# 此时调用result()会报错,因为任务没完成
# print(task.result()) # 会抛InvalidStateError
# 等待任务完成
await task
# 任务完成后查状态和结果
print("任务是否完成?", task.done()) # True
print("任务结果:", task.result()) # 最终结果
asyncio.run(main())刚才咱们跑了 3 个任务,现在挑战 1000 个任务!但要注意:如果同时发 1000 个请求,服务器可能会把你拉黑,所以需要用Semaphore(信号量)控制并发数。
Semaphore(n)可以限制 “同时运行的协程数”,比如Semaphore(10)就是最多同时跑 10 个任务。
import aiohttp
import asyncio
import time
async def fetch_api(url, semaphore):
"""带信号量的异步API请求"""
# async with semaphore:自动管理信号量(获取和释放)
async with semaphore:
print(f"请求中:{url}")
await asyncio.sleep(1) # 模拟API耗时1秒
print(f"完成:{url}")
return url
async def main():
# 1. 创建信号量,限制并发数10
semaphore = asyncio.Semaphore(10)
# 2. 生成1000个URL
urls = [f"https://api.example.com/{i}" for i in range(1000)]
# 3. 创建1000个任务
tasks = [
asyncio.create_task(fetch_api(url, semaphore))
for url in urls
]
# 4. 执行任务并计时
start_time = time.time()
results = await asyncio.gather(*tasks)
end_time = time.time()
# 5. 输出结果
print(f"n1000个任务全部完成!")
print(f"总耗时:{end_time - start_time:.2f}秒")
print(f"前10个任务结果:{results[:10]}")
asyncio.run(main())运行结果(总耗时≈100 秒,因为 1000/10=100 批,每批 1 秒):
请求中:https://api.example.com/0
请求中:https://api.example.com/1
...(同时最多10个请求)
完成:https://api.example.com/0
完成:https://api.example.com/1
...
1000个任务全部完成!
总耗时:100.52秒
前10个任务结果:['https://api.example.com/0', 'https://api.example.com/1', ...]⚠️ 注意:如果不用信号量,1000 个任务会同时发,服务器很可能返回 429(请求过于频繁),所以信号量是异步并发的 “安全阀”。
咱们平时写异步代码,很容易踩坑,这里总结几个高频问题:
现象:协程里的asyncio.sleep()或 API 请求没生效,代码直接跳过。
原因:调用协程或可等待对象时,没加await,协程没被暂停,直接往下跑。
例子:
# 错误代码
async def wrong_demo():
print("开始")
asyncio.sleep(2) # 忘记加await
print("结束") # 会直接打印,不会等2秒
asyncio.run(wrong_demo())解决:加上await:await asyncio.sleep(2)
现象:异步代码跑起来和同步一样慢,甚至更慢。
原因:在异步里用了同步库(如 requests、time.sleep、sqlite3),这些库会阻塞整个事件循环,其他协程没法跑。
例子:
# 错误代码(用了requests同步库)
import requests
import asyncio
async def fetch_wrong(url):
response = requests.get(url) # 同步请求,阻塞事件循环
return response.status_code
async def main():
urls = ["https://httpbin.org/get"]*3
await asyncio.gather(*[fetch_wrong(url) for url in urls]) # 实际是同步执行
asyncio.run(main())解决:用对应的异步库替代:
现象:报错RuntimeError: Event loop is closed。
原因:手动创建事件循环并关闭后,又尝试提交新任务。
例子:
# 错误代码
async def task():
await asyncio.sleep(1)
def wrong_loop():
loop = asyncio.get_event_loop()
loop.run_until_complete(task())
loop.close() # 关闭循环
# 关闭后再提交任务,报错
loop.run_until_complete(task())
wrong_loop()解决:用asyncio.run()自动管理循环,不要手动关闭循环;如果必须手动,确保关闭后不再用。
现象:报错RuntimeError: There is no current event loop in thread 'Thread-1'.
原因:asyncio 的事件循环是线程隔离的,新线程里没有默认循环,直接用会报错。
例子:
# 错误代码
import asyncio
import threading
async def my_coro():
await asyncio.sleep(1)
def thread_func():
# 新线程里直接跑协程,报错
asyncio.run(my_coro())
thread = threading.Thread(target=thread_func)
thread.start()解决:在新线程里手动创建并设置事件循环(Python 3.7 + 也可以用asyncio.run(),但部分环境可能有问题):
def thread_func():
# 手动创建并设置循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(my_coro())
loop.close()异步编程是 Python 面试的高频考点,这里整理几个常问题和大白话回答:
答:简单说有 3 个核心区别:
await时交出控制权,是 “协作式” 的,没切换开销。答:只能跟 “可等待对象”(Awaitable),主要有 3 种:
async def函数调用后的结果,比如await my_coro()。asyncio.create_task()包装的协程,比如task = asyncio.create_task(my_coro()),然后await task。asyncio.Future(),设置结果后await。aiohttp的session.get(),本质也是可等待对象。不能跟普通函数、数字、字符串这些,比如await 123会直接报错。
答:两者都能等多个任务完成,但用法不一样:
results = await asyncio.gather(task1, task2),results 0 是 task1 的结果,results 1 是 task2 的结果。比如想等任意一个任务完成:
done, pending = await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED)答:主要有 3 种方式:
fetch_api()里捕TimeoutError。return_exceptions=True,这样某个任务报错不会影响其他任务,结果里会包含异常对象。task.result()会抛出异常,需要外层 try-except 捕获。例子(gather () 处理异常):
async def task1():
raise ValueError("任务1出错了")
async def task2():
return "任务2成功"
async def main():
# return_exceptions=True:让异常作为结果返回,不中断其他任务
results = await asyncio.gather(task1(), task2(), return_exceptions=True)
print(results) # [ValueError('任务1出错了'), '任务2成功']
asyncio.run(main())答:因为异步是单线程的!CPU 密集型任务需要一直占用 CPU 计算,没有await的机会,会让事件循环卡住,其他协程根本跑不了。
比如一个计算圆周率的 CPU 密集型任务:
async def calculate_pi():
# 模拟CPU密集计算(没有await)
result = 0
for i in range(10**8):
result += i
return result
async def main():
# 同时跑两个计算任务,但实际会排队执行,因为没有await
await asyncio.gather(calculate_pi(), calculate_pi())
asyncio.run(main())这两个任务会串行执行,总耗时是单个任务的 2 倍,异步没起到作用。
解决办法:CPU 密集型任务用loop.run_in_executor(),把任务交给线程池或进程池,不阻塞事件循环。
async def定义协程,await暂停协程,asyncio.run()运行协程。gather(),结果有序。task.cancel(),捕获CancelledError。wait_for(),捕获TimeoutError。Semaphore,避免服务器拉黑。await,不用同步库,注意线程隔离。只要记住:异步不是 “让代码变快”,而是 “让等待时间不浪费”—— 把等待的时间用来做其他事,整体效率自然就高了!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。