首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >一文学会Python 异步编程!

一文学会Python 异步编程!

原创
作者头像
小白的大数据之旅
发布2025-12-03 10:51:40
发布2025-12-03 10:51:40
5740
举报

一文学会 Python 异步编程!

咱们先聊个实际问题:你有没有写过这样的代码 —— 循环调用 10 个 API,每个 API 要等 2 秒,同步跑下来要 20 秒?要是能让这些请求 “同时跑”,2 秒就搞定多好?这就是异步编程要解决的事儿!

今天咱们就用 Python 自带的asyncio库,从基础到实战,把异步编程彻底搞明白。所有代码都能直接复制运行,遇到复杂点我会用比喻讲,保证不绕弯子。

一、先搞懂:啥是异步编程?为啥要用它?

咱们先对比两个生活场景,理解 “同步” 和 “异步” 的区别:

场景

同步做法

异步做法

去餐厅吃饭

点完菜就坐着等,菜上了才动筷子

点完菜先玩会儿手机,服务员喊 “菜好了” 再去吃

调用 API

发一个请求,等结果回来再发下一个

发完请求先去处理别的,结果回来再接着处理

核心结论:

  • 同步:做事要 “排队”,上一件做完才能做下一件,遇到等待(比如等 API 响应、等文件读取)就傻等
  • 异步:做事不用 “死等”,遇到等待就先去做别的,等前面的 “等待结束” 再回头处理

啥时候用异步?

只适合IO 密集型任务(比如调用 API、读文件、数据库操作)—— 这些任务的大部分时间都在 “等”(等网络响应、等磁盘读写)。

如果是CPU 密集型任务(比如算圆周率、图像处理),异步没用!因为 CPU 一直在干活,没空想别的,这时候得用多进程。

二、异步编程基础:先学会写 “协程”

异步编程的核心是 “协程”(可以理解成 “能暂停的函数”),写协程要用到两个关键字:asyncawait

1. 第一步:用 async 定义协程函数

协程函数和普通函数的区别,就是多了个async关键字:

代码语言:python
复制
# 这是一个协程函数

async def say_hello():

   print("Hello, 异步!")

   # 模拟“等待”(比如等API响应),用asyncio.sleep,不能用time.sleep!

   await asyncio.sleep(1)  # 关键:await会让协程“暂停”,去干别的

   print("Hello, 又回来了!")

⚠️ 注意:普通的time.sleep(1)会阻塞整个程序,而asyncio.sleep(1)是 “异步等待”,会让协程暂停,同时事件循环能去执行其他协程。

2. 第二步:运行协程

协程函数调用后不会直接执行,会返回一个 “协程对象”,必须交给asyncio的 “事件循环” 才能跑起来。

最简单的运行方式是用asyncio.run()(Python 3.7 + 支持):

代码语言:python
复制
import asyncio

async def say_hello():

   print("Hello, 异步!")

   await asyncio.sleep(1)

   print("Hello, 又回来了!")

# 运行协程(asyncio.run会自动创建和管理事件循环)

asyncio.run(say_hello())

运行结果:

代码语言:python
复制
Hello, 异步!

(等待1秒)

Hello, 又回来了!

3. await 是干啥的?

await的作用是 “暂停当前协程,等待另一个可等待对象(比如协程、任务)完成”。

你可以理解成:“我现在要等个结果,这段时间别管我,先去处理别人的事儿,等结果好了再叫我”。

比如两个协程配合:

代码语言:python
复制
import asyncio

async def wash_dishes():

   print("开始洗碗,要2秒")

   await asyncio.sleep(2)  # 洗碗时暂停,去干别的

   print("碗洗完了")

async def cook_rice():

   print("开始煮饭,要3秒")

   await asyncio.sleep(3)  # 煮饭时暂停,去干别的

   print("饭煮好了")

async def main():

   # 同时执行两个协程(不是排队等)

   await asyncio.gather(wash_dishes(), cook_rice())  # gather是“收集”多个协程

asyncio.run(main())

运行结果(总耗时 3 秒,不是 2+3=5 秒):

代码语言:python
复制
开始洗碗,要2秒

开始煮饭,要3秒

(等待2秒后)碗洗完了

(再等1秒后)饭煮好了

三、asyncio 核心组件:事件循环和任务

刚才咱们用了asyncio.run()gather(),背后其实是两个核心东西:事件循环任务

1. 事件循环:异步的 “调度员”

事件循环就像餐厅的 “调度员”,负责:

  1. 管理所有协程的执行顺序
  2. 当一个协程await时,把它挂起,去执行其他能跑的协程
  3. await的任务完成后,再把挂起的协程拉回来继续跑

你不用手动创建事件循环(asyncio.run()会帮你做),但得知道它的存在 —— 异步所有逻辑都靠它驱动。

2. 协程 vs 任务:别搞混了

很多人会把 “协程对象” 和 “任务对象” 搞混,咱们用表格分清:

类型

定义

创建方式

特点

用途

协程对象

async def 函数调用后得到的对象

coro = wash_dishes()

不能直接运行,必须靠 await 或任务驱动

封装单个异步逻辑

任务对象

把协程包装成 “可调度的任务”

task = asyncio.create_task(coro)

会被事件循环自动调度,能跟踪状态(是否完成、结果是什么)

管理异步任务(取消、查状态)

简单说:任务是 “可管理的协程”,能主动取消、查结果,而协程不行。

比如创建任务并查状态:

代码语言:python
复制
import asyncio

async def my_task():

   await asyncio.sleep(2)

   return "任务完成!"

async def main():

   # 1. 创建协程对象

   coro = my_task()

   print("协程对象类型:", type(coro))  # <class 'coroutine'>

   # 2. 把协程包装成任务(会自动加入事件循环)

   task = asyncio.create_task(coro)

   print("任务对象类型:", type(task))  # <class '_asyncio.Task'>

   # 3. 查任务状态

   print("任务是否完成?", task.done())  # False(刚创建,还在跑)

   # 4. 等待任务完成,获取结果

   result = await task

   print("任务结果:", result)  # 任务完成!

   print("任务是否完成?", task.done())  # True(已经跑完)

asyncio.run(main())

四、实战 1:模拟 API 请求,看异步多快

咱们用最常见的 “调用 API” 场景,对比同步和异步的效率差异。

需要先装个处理异步 HTTP 请求的库:pip install aiohttp(普通的 requests 库是同步的,不能用在异步里)。

1. 同步请求:排队等,慢!

假设要调用 3 个 API,每个要 2 秒:

代码语言:python
复制
import requests

import time

def fetch_api_sync(url):

   """同步调用API"""

   print(f"开始请求:{url}")

   response = requests.get(url)  # 同步等待,会阻塞

   print(f"完成请求:{url},状态码:{response.status_code}")

   return response.status_code

def main_sync():

   urls = [

       "https://httpbin.org/get?name=1",

       "https://httpbin.org/get?name=2",

       "https://httpbin.org/get?name=3"

   ]

   start_time = time.time()

   # 同步执行:一个完了再跑下一个

   for url in urls:

       fetch_api_sync(url)

   end_time = time.time()

   print(f"同步总耗时:{end_time - start_time:.2f}秒")

main_sync()

运行结果(总耗时≈6 秒):

代码语言:python
复制
开始请求:https://httpbin.org/get?name=1

完成请求:https://httpbin.org/get?name=1,状态码:200

开始请求:https://httpbin.org/get?name=2

完成请求:https://httpbin.org/get?name=2,状态码:200

开始请求:https://httpbin.org/get?name=3

完成请求:https://httpbin.org/get?name=3,状态码:200

同步总耗时:6.12秒

2. 异步请求:同时跑,快!

aiohttpasyncio改写,让 3 个请求同时跑:

代码语言:python
复制
import aiohttp

import asyncio

import time

async def fetch_api_async(url, session):

   """异步调用API"""

   print(f"开始请求:{url}")

   # 异步请求:await会暂停,去跑其他请求

   async with session.get(url) as response:

       status = response.status

       print(f"完成请求:{url},状态码:{status}")

       return status

async def main_async():

   urls = [

       "https://httpbin.org/get?name=1",

       "https://httpbin.org/get?name=2",

       "https://httpbin.org/get?name=3"

   ]

   # 创建aiohttp的会话(复用连接,效率高)

   async with aiohttp.ClientSession() as session:

       # 1. 把每个API请求包装成任务

       tasks = [asyncio.create_task(fetch_api_async(url, session)) for url in urls]

       # 2. 等待所有任务完成,收集结果

       results = await asyncio.gather(*tasks)  # *tasks是解包列表

       print(f"所有请求结果:{results}")

   start_time = time.time()

   await main_async()

   end_time = time.time()

   print(f"异步总耗时:{end_time - start_time:.2f}秒")

asyncio.run(main_async())

运行结果(总耗时≈2 秒):

代码语言:python
复制
开始请求:https://httpbin.org/get?name=1

开始请求:https://httpbin.org/get?name=2

开始请求:https://httpbin.org/get?name=3

完成请求:https://httpbin.org/get?name=2,状态码:200

完成请求:https://httpbin.org/get?name=1,状态码:200

完成请求:https://httpbin.org/get?name=3,状态码:200

所有请求结果:[200, 200, 200]

异步总耗时:2.05秒

看到没?同样的任务,异步比同步快 3 倍!这就是异步的魅力。

五、核心技巧:任务取消、超时处理、状态检查

咱们已经会跑多任务了,但实际开发中还需要管理任务:比如取消卡住的任务、处理超时、查任务状态。

1. 任务取消:task.cancel ()

如果一个任务跑太久(比如 API 卡住了),可以主动取消它。取消会抛出CancelledError,需要捕获。

代码语言:python
复制
import asyncio

async def stuck_task():

   """模拟一个卡住的任务(比如API一直不响应)"""

   try:

       print("任务开始,要跑10秒...")

       await asyncio.sleep(10)  # 模拟长时间运行

       print("任务正常完成")

   except asyncio.CancelledError:

       # 捕获取消异常,做清理工作(比如关闭连接)

       print("任务被取消了,做下清理")

       raise  # 可选:如果想让外层知道取消,就抛出

async def main():

   # 创建任务

   task = asyncio.create_task(stuck_task())

   # 等3秒后取消任务

   await asyncio.sleep(3)

   print("准备取消任务")

   task.cancel()  # 取消任务

   # 必须await被取消的任务,否则会有警告

   try:

       await task

   except asyncio.CancelledError:

       print("外层捕获到任务被取消")

asyncio.run(main())

运行结果:

代码语言:python
复制
任务开始,要跑10秒...

(等待3秒)

准备取消任务

任务被取消了,做下清理

外层捕获到任务被取消

2. 超时处理:asyncio.wait_for ()

如果想给任务设置 “超时时间”(比如 API 请求超过 5 秒就放弃),用asyncio.wait_for(),超时会抛出TimeoutError

代码语言:python
复制
import asyncio

async def fetch_api(url):

   """模拟API请求,耗时3秒"""

   await asyncio.sleep(3)

   return f"{url} 响应成功"

async def main():

   url = "https://api.example.com"

   try:

       # 设置超时2秒(比请求耗时短)

       result = await asyncio.wait_for(fetch_api(url), timeout=2)

       print(result)

   except asyncio.TimeoutError:

       print(f"请求{url}超时了!")

asyncio.run(main())

运行结果:

代码语言:python
复制
请求https://api.example.com超时了!

3. 状态检查:task.done () 和 task.result ()

  • task.done():判断任务是否完成(返回 True/False)
  • task.result():获取任务的返回结果(只能在任务完成后调用,否则报错)
代码语言:python
复制
import asyncio

async def my_task():

   await asyncio.sleep(2)

   return "最终结果"

async def main():

   task = asyncio.create_task(my_task())

  

   # 任务没完成时查状态

   print("任务是否完成?", task.done())  # False

   # 此时调用result()会报错,因为任务没完成

   # print(task.result())  # 会抛InvalidStateError

   # 等待任务完成

   await task

  

   # 任务完成后查状态和结果

   print("任务是否完成?", task.done())  # True

   print("任务结果:", task.result())  # 最终结果

asyncio.run(main())

六、实战 2:千任务并发,还能控制速度

刚才咱们跑了 3 个任务,现在挑战 1000 个任务!但要注意:如果同时发 1000 个请求,服务器可能会把你拉黑,所以需要用Semaphore(信号量)控制并发数。

1. 千任务并发 + 信号量控制

Semaphore(n)可以限制 “同时运行的协程数”,比如Semaphore(10)就是最多同时跑 10 个任务。

代码语言:python
复制
import aiohttp

import asyncio

import time

async def fetch_api(url, semaphore):

   """带信号量的异步API请求"""

   # async with semaphore:自动管理信号量(获取和释放)

   async with semaphore:

       print(f"请求中:{url}")

       await asyncio.sleep(1)  # 模拟API耗时1秒

       print(f"完成:{url}")

       return url

async def main():

   # 1. 创建信号量,限制并发数10

   semaphore = asyncio.Semaphore(10)

   # 2. 生成1000个URL

   urls = [f"https://api.example.com/{i}" for i in range(1000)]

   # 3. 创建1000个任务

   tasks = [

       asyncio.create_task(fetch_api(url, semaphore))

       for url in urls

   ]

   # 4. 执行任务并计时

   start_time = time.time()

   results = await asyncio.gather(*tasks)

   end_time = time.time()

   # 5. 输出结果

   print(f"n1000个任务全部完成!")

   print(f"总耗时:{end_time - start_time:.2f}秒")

   print(f"前10个任务结果:{results[:10]}")

asyncio.run(main())

运行结果(总耗时≈100 秒,因为 1000/10=100 批,每批 1 秒):

代码语言:python
复制
请求中:https://api.example.com/0

请求中:https://api.example.com/1

...(同时最多10个请求)

完成:https://api.example.com/0

完成:https://api.example.com/1

...

1000个任务全部完成!

总耗时:100.52秒

前10个任务结果:['https://api.example.com/0', 'https://api.example.com/1', ...]

⚠️ 注意:如果不用信号量,1000 个任务会同时发,服务器很可能返回 429(请求过于频繁),所以信号量是异步并发的 “安全阀”。

七、常见问题和错误:踩过的坑都在这

咱们平时写异步代码,很容易踩坑,这里总结几个高频问题:

1. 坑 1:忘记加 await,协程不执行

现象:协程里的asyncio.sleep()或 API 请求没生效,代码直接跳过。

原因:调用协程或可等待对象时,没加await,协程没被暂停,直接往下跑。

例子

代码语言:python
复制
# 错误代码

async def wrong_demo():

   print("开始")

   asyncio.sleep(2)  # 忘记加await

   print("结束")  # 会直接打印,不会等2秒

asyncio.run(wrong_demo())

解决:加上awaitawait asyncio.sleep(2)

2. 坑 2:用了同步库(比如 requests),阻塞事件循环

现象:异步代码跑起来和同步一样慢,甚至更慢。

原因:在异步里用了同步库(如 requests、time.sleep、sqlite3),这些库会阻塞整个事件循环,其他协程没法跑。

例子

代码语言:python
复制
# 错误代码(用了requests同步库)

import requests

import asyncio

async def fetch_wrong(url):

   response = requests.get(url)  # 同步请求,阻塞事件循环

   return response.status_code

async def main():

   urls = ["https://httpbin.org/get"]*3

   await asyncio.gather(*[fetch_wrong(url) for url in urls])  # 实际是同步执行

asyncio.run(main())

解决:用对应的异步库替代:

  • requests → aiohttp
  • time.sleep → asyncio.sleep
  • sqlite3 → aiosqlite

3. 坑 3:事件循环关闭后,还提交任务

现象:报错RuntimeError: Event loop is closed

原因:手动创建事件循环并关闭后,又尝试提交新任务。

例子

代码语言:python
复制
# 错误代码

async def task():

   await asyncio.sleep(1)

def wrong_loop():

   loop = asyncio.get_event_loop()

   loop.run_until_complete(task())

   loop.close()  # 关闭循环

   # 关闭后再提交任务,报错

   loop.run_until_complete(task())

wrong_loop()

解决:用asyncio.run()自动管理循环,不要手动关闭循环;如果必须手动,确保关闭后不再用。

4. 坑 4:多线程里用 asyncio,没指定事件循环

现象:报错RuntimeError: There is no current event loop in thread 'Thread-1'.

原因:asyncio 的事件循环是线程隔离的,新线程里没有默认循环,直接用会报错。

例子

代码语言:python
复制
# 错误代码

import asyncio

import threading

async def my_coro():

   await asyncio.sleep(1)

def thread_func():

   # 新线程里直接跑协程,报错

   asyncio.run(my_coro())

thread = threading.Thread(target=thread_func)

thread.start()

解决:在新线程里手动创建并设置事件循环(Python 3.7 + 也可以用asyncio.run(),但部分环境可能有问题):

代码语言:python
复制
def thread_func():

   # 手动创建并设置循环

   loop = asyncio.new_event_loop()

   asyncio.set_event_loop(loop)

   loop.run_until_complete(my_coro())

   loop.close()

八、面试常问:这些问题要会答

异步编程是 Python 面试的高频考点,这里整理几个常问题和大白话回答:

1. 问:异步编程和多线程有啥区别?

:简单说有 3 个核心区别:

  • 线程是 “操作系统调度”,异步是 “自己调度”:线程切换由操作系统控制,可能随时打断;异步协程是自己在await时交出控制权,是 “协作式” 的,没切换开销。
  • 线程有 GIL 限制,异步没有:Python 的多线程因为 GIL(全局解释器锁),同一时间只能跑一个线程,CPU 密集型任务效率低;异步是单线程,不用等 GIL,IO 密集型任务效率高。
  • 适用场景不同:异步适合 IO 密集型(请求 API、读文件),多线程适合 CPU 密集型(但要配合多进程)。

2. 问:await 后面能跟什么东西?

:只能跟 “可等待对象”(Awaitable),主要有 3 种:

  • 协程对象:就是async def函数调用后的结果,比如await my_coro()
  • Task 对象:用asyncio.create_task()包装的协程,比如task = asyncio.create_task(my_coro()),然后await task
  • Future 对象:更底层的 “未来结果”,平时用得少,比如asyncio.Future(),设置结果后await
  • 其他:比如aiohttpsession.get(),本质也是可等待对象。

不能跟普通函数、数字、字符串这些,比如await 123会直接报错。

3. 问:asyncio.gather () 和 asyncio.wait () 有啥区别?

:两者都能等多个任务完成,但用法不一样:

  • gather ():更常用,能收集任务结果,结果顺序和任务创建顺序一致。比如results = await asyncio.gather(task1, task2),results 0 是 task1 的结果,results 1 是 task2 的结果。
  • wait ():更灵活,能设置等待方式(比如等任意一个任务完成就返回),返回的是 “已完成任务” 和 “未完成任务” 的集合,需要自己提取结果。

比如想等任意一个任务完成:

代码语言:python
复制
done, pending = await asyncio.wait([task1, task2], return_when=asyncio.FIRST_COMPLETED)

4. 问:怎么处理异步任务的异常?

:主要有 3 种方式:

  • 单个任务:在协程内部用 try-except 捕获,比如 API 请求超时,在fetch_api()里捕TimeoutError
  • 多个任务用 gather ():可以在 gather () 里加return_exceptions=True,这样某个任务报错不会影响其他任务,结果里会包含异常对象。
  • 单个任务用 task.result ():任务完成后,调用task.result()会抛出异常,需要外层 try-except 捕获。

例子(gather () 处理异常):

代码语言:python
复制
async def task1():

   raise ValueError("任务1出错了")

async def task2():

   return "任务2成功"

async def main():

   # return_exceptions=True:让异常作为结果返回,不中断其他任务

   results = await asyncio.gather(task1(), task2(), return_exceptions=True)

   print(results)  # [ValueError('任务1出错了'), '任务2成功']

asyncio.run(main())

5. 问:为什么异步不适合 CPU 密集型任务?

:因为异步是单线程的!CPU 密集型任务需要一直占用 CPU 计算,没有await的机会,会让事件循环卡住,其他协程根本跑不了。

比如一个计算圆周率的 CPU 密集型任务:

代码语言:python
复制
async def calculate_pi():

   # 模拟CPU密集计算(没有await)

   result = 0

   for i in range(10**8):

       result += i

   return result

async def main():

   # 同时跑两个计算任务,但实际会排队执行,因为没有await

   await asyncio.gather(calculate_pi(), calculate_pi())

asyncio.run(main())

这两个任务会串行执行,总耗时是单个任务的 2 倍,异步没起到作用。

解决办法:CPU 密集型任务用loop.run_in_executor(),把任务交给线程池或进程池,不阻塞事件循环。

九、总结:异步编程要点回顾

  1. 核心目标:解决 IO 密集型任务的 “等待” 问题,提升效率。
  2. 基础语法:用async def定义协程,await暂停协程,asyncio.run()运行协程。
  3. 核心组件:事件循环(调度员)、任务(可管理的协程)、信号量(控制并发)。
  4. 关键技巧
  • 多任务用gather(),结果有序。
  • 取消任务用task.cancel(),捕获CancelledError
  • 超时用wait_for(),捕获TimeoutError
  • 控制并发用Semaphore,避免服务器拉黑。
  1. 避坑指南:别忘加await,不用同步库,注意线程隔离。

只要记住:异步不是 “让代码变快”,而是 “让等待时间不浪费”—— 把等待的时间用来做其他事,整体效率自然就高了!

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一文学会 Python 异步编程!
    • 一、先搞懂:啥是异步编程?为啥要用它?
      • 核心结论:
      • 啥时候用异步?
    • 二、异步编程基础:先学会写 “协程”
      • 1. 第一步:用 async 定义协程函数
      • 2. 第二步:运行协程
      • 3. await 是干啥的?
    • 三、asyncio 核心组件:事件循环和任务
      • 1. 事件循环:异步的 “调度员”
      • 2. 协程 vs 任务:别搞混了
    • 四、实战 1:模拟 API 请求,看异步多快
      • 1. 同步请求:排队等,慢!
      • 2. 异步请求:同时跑,快!
    • 五、核心技巧:任务取消、超时处理、状态检查
      • 1. 任务取消:task.cancel ()
      • 2. 超时处理:asyncio.wait_for ()
      • 3. 状态检查:task.done () 和 task.result ()
    • 六、实战 2:千任务并发,还能控制速度
      • 1. 千任务并发 + 信号量控制
    • 七、常见问题和错误:踩过的坑都在这
      • 1. 坑 1:忘记加 await,协程不执行
      • 2. 坑 2:用了同步库(比如 requests),阻塞事件循环
      • 3. 坑 3:事件循环关闭后,还提交任务
      • 4. 坑 4:多线程里用 asyncio,没指定事件循环
    • 八、面试常问:这些问题要会答
      • 1. 问:异步编程和多线程有啥区别?
      • 2. 问:await 后面能跟什么东西?
      • 3. 问:asyncio.gather () 和 asyncio.wait () 有啥区别?
      • 4. 问:怎么处理异步任务的异常?
      • 5. 问:为什么异步不适合 CPU 密集型任务?
    • 九、总结:异步编程要点回顾
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档