一个时间段
内,在一个CPU(CPU核心)能运行的程序的数量。同一时刻
,在多个CPU上运行多个程序,跟CPU(CPU核心)数量有关。因为
计算机CPU(CPU核心)在同一时刻只能运行一个程序。
CPU密集型(CPU-bound):
CPU密集型又叫做计算密集型,指I/O在很短时间就能完成,CPU需要大量的计算和处理,特点是CPU占用高。
例如:压缩解压缩、加密解密、正则表达式搜索。
IO密集型(I/O-bound):
IO密集型是指系统运行时大部分时间时CPU在等待IO操作(硬盘/内存)的读写操作,特点是CPU占用较低。
例如:文件读写、网络爬虫、数据库读写。
类型 | 优点 | 缺点 | 适用 |
---|---|---|---|
多进程Process(multiprocessing) | 可以利用CPU多核并行运算 | 占用资源最多可启动数目比线程少 | CPU密集型计算 |
多线程Thread(threading) | 相比进程更轻量占用资源少 | 相比进程,多线程只能并发执行,不能利用多CPU(GIL)相比协程启动数目有限制,占用内存资源有线程切换开销 | IO密集型计算、同时运行的任务要求不多 |
多协程Coroutine(asyncio) | 内存开销最少,启动协程数量最多 | 支持库的限制代码实现复杂 | IO密集型计算、同时运行的较多任务 |
GIL全称Global Interpreter Lock
下图为GIL的运行
Python的多线程是伪多线程,同时只能有一个线程运行。
一个进程能够启动N个线程,数量受系统限制。
一个线程能够启动N个协程,数量不受限制。
怎么选择
对于其他语言来说,多线程是能同时利用多CPU(核)的,所以是适用CPU密集型计算的,但是Python由于GIL的限制,只能使用IO密集型计算。 所以对于Python来说:
import asyncio
async def test():
await asyncio.sleep(3)
return "123"
async def main():
result = await test()
print(result)
if __name__ == '__main__':
asyncio.run(main())
import threading
import asyncio
async def myfun(index):
print(f'[{index}]({threading.currentThread().name})')
await asyncio.sleep(1)
return index
def getfuture(future):
print(future.result())
loop = asyncio.get_event_loop()
tasks = []
for item in range(3):
future = asyncio.ensure_future(myfun(item))
tasks.append(future)
future.add_done_callback(getfuture)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
import threading
import asyncio
async def myfun(index):
print(f'[{index}]({threading.currentThread().name})')
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
tasks = [myfun(1), myfun(2)]
loop.run_until_complete(asyncio.wait(tasks))
#loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
asyncio.gather 和asyncio.wait区别:
在内部wait()使用一个set保存它创建的Task实例。因为set是无序的所以这也就是我们的任务不是顺序执行的原因。wait的返回值是一个元组,包括两个集合,分别表示已完成和未完成的任务。wait第二个参数为一个超时值 达到这个超时时间后,未完成的任务状态变为pending,当程序退出时还有任务没有完成此时就会看到如下的错误提示。
gather的使用 gather的作用和wait类似不同的是。
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
import requests
def myquery(url):
r = requests.get(url)
print(r.text)
return r.text
if __name__ == "__main__":
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
urls = ["https://www.psvmc.cn/userlist.json", "https://www.psvmc.cn/login.json"]
tasks = []
start_time = time.time()
for url in urls:
task = loop.run_in_executor(executor, myquery, url)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
print(f"用时{time.time() - start_time}")
结果
{"code":0,"msg":"success","obj":{"name":"小明","sex":"男","token":"psvmc"}}
{"code":0,"msg":"success","obj":[{"name":"小明","sex":"男"},{"name":"小红","sex":"女"},{"name":"小刚","sex":"未知"}]}
用时0.11207175254821777
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import requests
def myquery(url):
print(f"请求所在线程:{threading.current_thread().name}")
r = requests.get(url)
return r.text
def myfuture(future):
print(f"回调所在线程:{threading.current_thread().name}")
print(future.result())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(3)
url = "https://www.psvmc.cn/userlist.json"
tasks = []
start_time = time.time()
task = loop.run_in_executor(executor, myquery, url)
future = asyncio.ensure_future(task)
future.add_done_callback(myfuture)
loop.run_until_complete(future)
print(f"用时{time.time() - start_time}")
from threading import Thread
def func(num):
return num
t = Thread(target=func, args=(100,))
t.start()
t.join()
import queue
q = queue.Queue()
q.put(1)
item = q.get()
from threading import Lock
lock = Lock()
with lock:
pass
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
# 方法1
results = executor.map(func, [1, 2, 3])
# 方法2
future = executor.submit(func, 1)
result = future.result()
from concurrent.futures import ThreadPoolExecutor
import threading
import time
# 定义一个准备作为线程任务的函数
def action(num):
print(threading.current_thread().name)
time.sleep(num)
return num + 100
if __name__ == "__main__":
# 创建一个包含3条线程的线程池
with ThreadPoolExecutor(max_workers=3) as pool:
future1 = pool.submit(action, 3)
future1.result()
print(f"单个任务返回:{future1.result()}")
print('------------------------------')
# 使用线程执行map计算
results = pool.map(action, (1, 3, 5))
for r in results:
print(f"多个任务返回:{r}")
结果
ThreadPoolExecutor-0_0
单个任务返回:103
------------------------------
ThreadPoolExecutor-0_0
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_2
多个任务返回:101
多个任务返回:103
多个任务返回:105
from multiprocessing import Process
def func(num):
return num
t = Process(target=func, args=(100,))
t.start()
t.join()
import multiprocessing
q = multiprocessing.Queue()
q.put(1)
item = q.get()
from multiprocessing import Lock
lock = Lock()
with lock:
pass
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
# 方法1
results = executor.map(func, [1, 2, 3])
# 方法2
future = executor.submit(func, 1)
result = future.result()
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import time
# 定义一个准备作为进程任务的函数
def action(num):
print(multiprocessing.current_process().name)
time.sleep(num)
return num + 100
if __name__ == "__main__":
# 创建一个包含3条进程的进程池
with ProcessPoolExecutor(max_workers=3) as pool:
future1 = pool.submit(action, 3)
future1.result()
print(f"单个任务返回:{future1.result()}")
print('------------------------------')
# 使用线程执行map计算
results = pool.map(action, [1, 3, 5])
for r in results:
print(f"多个任务返回:{r}")
结果
SpawnProcess-1
单个任务返回:103
------------------------------
SpawnProcess-2
SpawnProcess-3
SpawnProcess-1
多个任务返回:101
多个任务返回:103
多个任务返回:105
异步 IO(asyncio)、多进程(multiprocessing)、多线程(multithreading)
IO 密集型应用CPU等待IO时间远大于CPU 自身运行时间,太浪费;
常见的 IO 密集型业务包括:浏览器交互、磁盘请求、网络爬虫、数据库请求等
Python 世界对于 IO 密集型场景的并发提升有 3 种方法:多进程、多线程、多协程;
理论上讲asyncio是性能最高的,原因如下:
那asyncio的实际表现是否如理论上那么强,到底强多少呢?我构建了如下测试场景:
请求10此,并sleep 1s模拟业务查询
最后的asyncio+uvloop
和官方asyncio
最大不同是用 Cython+libuv
重新实现了asyncio
的事件循环(event loop)部分,
官方测试性能是 node.js的 2 倍,持平 golang。
import time
def query(num):
print(num)
time.sleep(1)
def main():
for h in range(10):
query(h)
# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"时间差:{end_time-start_time}")
from concurrent import futures
import time
def query(num):
print(num)
time.sleep(1)
def main():
with futures.ProcessPoolExecutor() as executor:
for future in executor.map(query, range(10)):
pass
# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"时间差:{end_time-start_time}")
from concurrent import futures
import time
def query(num):
print(num)
time.sleep(1)
def main():
with futures.ThreadPoolExecutor() as executor:
for future in executor.map(query, range(10)):
pass
# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"时间差:{end_time-start_time}")
import asyncio
import time
async def query(num):
print(num)
await asyncio.sleep(1)
async def main():
tasks = [asyncio.create_task(query(num)) for num in range(10)]
await asyncio.gather(*tasks)
# main entrance
if __name__ == '__main__':
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
print(f"时间差:{end_time-start_time}")
注意
Windows上不支持uvloop。
示例
import asyncio
import uvloop
import time
async def query(num):
print(num)
await asyncio.sleep(1)
async def main():
tasks = [asyncio.create_task(query(host)) for host in range(10)]
await asyncio.gather(*tasks)
# main entrance
if __name__ == '__main__':
uvloop.install()
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
print(f"时间差:{end_time-start_time}")
方式 | 运行时间 |
---|---|
串行 | 10.0750972s |
多进程 | 1.1638731999999998s |
多线程 | 1.0146456s |
asyncio | 1.0110082s |
asyncio+uvloop | 1.01s |
可以看出: 无论多进程、多线程还是asyncio都能大幅提升IO 密集型场景下的并发,但asyncio+uvloop性能最高!