
咱做 Python 开发的,肯定遇到过这种糟心事儿:写了个爬虫用 requests 爬多个网站,结果代码跑起来像蜗牛 —— 一个网站卡着,其他全得等着;或者写文件处理脚本,读几个大文件也得挨个来,明明 CPU 闲着,就是没法并行。这其实都是「同步函数阻塞」搞的鬼!
今天就教你个绝招:用 Python 自带的asyncio.to_thread,一行代码把同步函数改成异步的,彻底解决 requests 这类同步模块的阻塞问题。尤其适合 IO 密集型任务(比如网络请求、文件读写、数据库操作),不用大改旧代码,效率直接翻倍!
在讲asyncio.to_thread之前,得先明白核心矛盾:同步函数会阻塞 asyncio 事件循环。
咱先用表格分清「同步」和「异步」的区别,后面理解起来更轻松:
类型 | 执行方式 | 是否阻塞事件循环 | 适用场景 | 典型库 |
|---|---|---|---|---|
同步函数 | 从头跑到尾,等 IO 完成才停 | 是 | 简单逻辑、非并发 | requests、os |
异步函数 | 遇到 IO 就 “暂停”,让给别人 | 否 | 高并发 IO 任务 | aiohttp、asyncio |
to_thread 包装的同步函数 | 同步函数放线程池跑,不占事件循环 | 否 | 已有同步代码想异步 | requests+to_thread |
举个具体例子:用 requests 爬 5 个网站,同步代码是这么跑的:
这期间,CPU 其实大部分时间都在 “摸鱼”—— 就等网络响应呢,但因为同步函数 “不放手”,事件循环被卡得动弹不得,其他任务根本没法跑。
而asyncio.to_thread的作用,就是把这个 “卡脖子” 的同步函数,扔到一个专门的「线程池」里去执行。这样事件循环就解放了,能同时处理多个同步函数的 IO 等待,5 个网站请求可能 2 秒多就跑完了!
asyncio.to_thread是 Python 3.9 版本才新增的功能,本质是「loop.run_in_executor的简化版」—— 不用自己手动获取事件循环、不用配置线程池,一行代码就能把同步函数变成可 await 的协程。
语法:await asyncio.to_thread(同步函数, 函数参数1, 函数参数2, ...)
解释下:
await:必须加!因为 to_thread 返回的是协程对象,得用 await 等它跑完同步函数:你要包装的纯同步函数(比如 requests.get、os.read)举个最小例子:包装一个简单的同步函数
import asyncio
# 这是个纯同步函数,没有任何async/await
def add(a, b):
print(f"计算 {a} + {b}")
return a + b
# 异步主函数
async def main():
# 一行代码包装同步函数,还能传参数
result = await asyncio.to_thread(add, 3, 5)
print(f"结果:{result}") # 输出:结果:8
# 运行异步代码
asyncio.run(main())这段代码能直接跑(Python 3.9+),你复制过去执行,肯定能得到结果。
ThreadPoolExecutor,asyncio 会自动维护一个默认线程池,用完还会自动回收,不用操心资源泄露loop.run_in_executor,后面会讲替代方案)咱拿最常见的「批量爬网站状态」举例,对比同步和异步的效率差异,看完你就知道 to_thread 多香了!
用 requests 逐个请求网站,看看耗时多少:
import requests
import time
# 同步函数:获取网站状态(成功返回状态码,失败返回错误信息)
def get_website_status(url, timeout=5):
try:
# 同步请求:这里会阻塞,直到拿到响应
response = requests.get(url, timeout=timeout)
return {
"url": url,
"status_code": response.status_code,
"result": "成功",
"error_msg": ""
}
except Exception as e:
return {
"url": url,
"status_code": None,
"result": "失败",
"error_msg": str(e)
}
if __name__ == "__main__":
# 要爬的5个网站(包含国内和国外的,差异更明显)
target_urls = [
"https://www.baidu.com",
"https://www.taobao.com",
"https://juejin.cn",
"https://github.com",
"https://stackoverflow.com"
]
# 同步执行:逐个请求
start_time = time.time()
results = [get_website_status(url) for url in target_urls]
end_time = time.time()
# 打印结果
print("=== 同步执行结果 ===")
for res in results:
print(f"URL: {res['url']:20} | 状态码: {res['status_code']:4} | 结果: {res['result']} | 错误: {res['error_msg']}")
print(f"n同步总耗时:{end_time - start_time:.2f} 秒")=== 同步执行结果 ===
URL: https://www.baidu.com | 状态码: 200 | 结果: 成功 | 错误:
URL: https://www.taobao.com | 状态码: 200 | 结果: 成功 | 错误:
URL: https://juejin.cn | 状态码: 200 | 结果: 成功 | 错误:
URL: https://github.com | 状态码: 200 | 结果: 成功 | 错误:
URL: https://stackoverflow.com | 状态码: 200 | 结果: 成功 | 错误:
同步总耗时:8.76 秒看到没?同步跑要 8 秒多,因为每个请求都得等前一个完成。
就改 3 处地方,同步函数一行不动!:
import asyncio
import requests
import time
# 【原封不动的同步函数】
def get_website_status(url, timeout=5):
try:
response = requests.get(url, timeout=timeout)
return {
"url": url,
"status_code": response.status_code,
"result": "成功",
"error_msg": ""
}
except Exception as e:
return {
"url": url,
"status_code": None,
"result": "失败",
"error_msg": str(e)
}
# 【新增1:用to_thread包装同步函数,变成异步函数】
async def async_wrap_status(url, timeout=5):
# 一行代码!把同步函数扔到线程池
return await asyncio.to_thread(get_website_status, url, timeout)
# 【新增2:异步主函数,负责并发调度】
async def main():
target_urls = [
"https://www.baidu.com",
"https://www.taobao.com",
"https://juejin.cn",
"https://github.com",
"https://stackoverflow.com"
]
# 1. 创建所有异步任务(不会立即执行,只是准备好)
tasks = [async_wrap_status(url) for url in target_urls]
# 2. 并发执行所有任务,等待全部完成(这步是核心!)
results = await asyncio.gather(*tasks)
return results
if __name__ == "__main__":
# 【修改3:用asyncio.run运行异步主函数】
start_time = time.time()
results = asyncio.run(main()) # 关键:启动事件循环
end_time = time.time()
# 打印结果
print("=== 异步执行结果 ===")
for res in results:
print(f"URL: {res['url']:20} | 状态码: {res['status_code']:4} | 结果: {res['result']} | 错误: {res['error_msg']}")
print(f"n异步总耗时:{end_time - start_time:.2f} 秒")=== 异步执行结果 ===
URL: https://www.baidu.com | 状态码: 200 | 结果: 成功 | 错误:
URL: https://www.taobao.com | 状态码: 200 | 结果: 成功 | 错误:
URL: https://juejin.cn | 状态码: 200 | 结果: 成功 | 错误:
URL: https://github.com | 状态码: 200 | 结果: 成功 | 错误:
URL: https://stackoverflow.com | 状态码: 200 | 结果: 成功 | 错误:
异步总耗时:2.13 秒天呐!从 8 秒多降到 2 秒多,效率直接翻 4 倍!这就是异步并发的魔力 —— 所有请求同时发,等响应的时候互不耽误。
async_wrap_status函数: 这是个 “中间层”,用asyncio.to_thread把同步的get_website_status包装成异步函数。
注意:to_thread后面的参数,就是同步函数需要的参数(url、timeout),直接按顺序传就行。
asyncio.gather(*tasks):这是 asyncio 里负责 “并发执行多个协程” 的工具。
tasks)收集起来,用*解包(因为 gather 需要逐个传参);await gather会等待所有任务都完成,然后返回一个结果列表,顺序和任务列表一致(比如第一个任务对应第一个结果)。asyncio.run(main()):这是 Python 3.7 + 启动事件循环的简化写法,不用自己手动创建和管理 loop,一行搞定。
咱实际开发中,肯定会遇到各种小问题,这里把最常见的坑列出来,每个都给解决方案:
症状:异步代码跑起来和同步差不多慢,甚至更慢。
原因 & 解决:
Python 有个 GIL 锁(全局解释器锁),同一时间只能有一个线程执行 Python 代码。如果函数一直在计算(比如算 100 万次平方),线程会一直占着 GIL,其他线程没法跑,所以并发没用。
→ 解决方案:CPU 密集型任务用asyncio.to_process(Python 3.12+)或multiprocessing模块,用多进程而不是多线程。
await: 比如get_website_status里写了await async_func(),这会报错!因为同步函数不能有await。
→ 解决方案:to_thread 只能包装「纯同步函数」,同步函数里不能有任何异步代码。
症状:比如某个网站无法访问(抛异常),gather会直接报错,其他成功的结果也拿不到。
解决:给单个任务加异常捕获,或者用asyncio.gather(return_exceptions=True)。
async def async_wrap_status(url, timeout=5):
try:
# 给to_thread加try-except,捕获单个任务的错误
return await asyncio.to_thread(get_website_status, url, timeout)
except Exception as e:
# 即使单个任务失败,也返回统一格式的结果,不影响其他任务
return {
"url": url,
"status_code": None,
"result": "任务执行失败",
"error_msg": f"to_thread异常: {str(e)}"
}return_exceptions=True(简单粗暴):# gather参数加return_exceptions=True,失败的任务会返回异常对象,不崩溃
results = await asyncio.gather(*tasks, return_exceptions=True)
# 后续处理结果时,判断是否是异常
for res in results:
if isinstance(res, Exception):
print(f"任务失败: {str(res)}")
else:
print(f"成功: {res}")症状:运行报错AttributeError: module 'asyncio' has no attribute 'to_thread'。
原因:to_thread 是 Python 3.9 才加的,你用的是 3.8 及以下版本。
解决方案:
loop.run_in_executor(to_thread 的底层就是这个):async def async_wrap_status(url, timeout=5):
# 1. 获取当前事件循环
loop = asyncio.get_running_loop()
# 2. 用run_in_executor,参数和to_thread差不多(第一个参数传None=用默认线程池)
return await loop.run_in_executor(None, get_website_status, url, timeout)效果和 to_thread 一样,就是多写一行获取 loop 的代码。
很多人会问:既然 aiohttp 是原生异步的网络库,为啥还要用 to_thread+requests?
咱用表格对比,看场景选:
对比维度 | asyncio.to_thread + requests | aiohttp(原生异步) |
|---|---|---|
代码改造量 | 几乎为 0(同步函数不动) | 大(得重写所有请求逻辑) |
性能 | 略差(线程切换有少量开销) | 更好(纯异步,无线程切换) |
学习成本 | 低(懂基础 async/await 就行) | 高(得学 aiohttp 的 API) |
适用场景 | 旧同步代码快速异步化、小项目 | 新项目、高并发需求(比如每秒上千请求) |
依赖 | 只依赖 requests(大部分项目已有) | 需额外装 aiohttp(pip install aiohttp) |
结论:
关于 asyncio.to_thread 的面试题,基本逃不出这几个,每个都给你准备好大白话答案:
答案:
to_thread 其实是把同步函数 “扔” 到 asyncio 管理的默认线程池里执行。
事件循环在遇到await to_thread(...)时,不会阻塞自己,而是去处理其他任务;等线程池里的同步函数执行完(比如 requests 拿到响应),事件循环再把结果拿回来,继续执行后续代码。
简单说:让同步函数在 “后台线程” 跑,不耽误事件循环干别的。
答案:
不适合!
因为 Python 有 GIL 锁,同一时间只能有一个线程执行 Python 代码。如果是 CPU 密集型任务(比如大量计算),线程会一直占着 GIL,其他线程根本没机会跑,就算用了 to_thread,也还是串行执行,甚至因为线程切换多了开销,反而更慢。
CPU 密集型任务应该用多进程,比如 Python 3.12 + 的asyncio.to_process,或者multiprocessing模块,绕开 GIL 锁。
答案:
看情况,但优先选 to_thread:
答案:
用loop.run_in_executor,这是 to_thread 的底层实现。步骤是:
asyncio.get_running_loop()获取当前事件循环;loop.run_in_executor(executor, func, *args),第一个参数传 None(用默认线程池),第二个是同步函数,后面是函数参数;比如:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, get_website_status, "https://baidu.com")答案:
因为 requests 是纯同步库,它的get方法会一直 “卡” 着,直到拿到服务器响应(或超时)。
在这个等待过程中,requests 不会主动 “释放” 事件循环的控制权,导致事件循环没法去处理其他协程,只能眼睁睁等着 requests 执行完,这就是 “阻塞”。
而 to_thread 把 requests 放到线程池里跑,事件循环就自由了,能去处理别的任务,所以不阻塞。
最后给大家画个重点,确保你看完就能用:
「同步代码想异步,IO 密集别犹豫,3.9 + 用 to_thread,一行代码解难题!」
好了,这篇文章把 asyncio.to_thread 的方方面面都讲透了,从原理到实战,从踩坑到面试,每个知识点都能用得上。你可以直接把案例代码复制到本地运行,感受下异步带来的效率提升。如果还有其他疑问,比如想优化数据库同步代码,或者处理文件异步读写,都可以再深入探讨!
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。