导读:我很笨,但是我很快——计算机之所以计算能力如此出众,不在于其有多智能,而是因为它超快的执行速度,而多核心则可以进一步成倍的提高效率。在python中,concurrent库就是用于完成并发的模块之一。
01 初识concurrent
concurrent库是python内置模块之一,基于threading和multiprocessing两个模块实现,并对二者进行了很好的封装和集成,使其拥有更加简洁易用的接口函数,无需再考虑start()、join()、lock()等问题。
打开concurrent模块(默认安装位于..\Python\Python37\Lib),发现当前其仅内置了一个futures子模块,而futures子模块中,则有3个重要的.py文件,其中_base.py是最主要的模块,提供了大部分并发功能,但属于私有模块,不能被其他程序直接import,另外两个则是process和thread模块,即多进程和多线程,二者均调用_base实现主要并发接口函数。
concurrent英文原义为"并发的",futures英文原义为"未来",模块取名concurrent很好理解(java中有同名包),而子模块取名futures则用以表示未来有待完成的任务,似乎也正体现了多线程/多进程中任务队列的含义。
注:关于多线程和多进程的理解和区别本文不予展开,网上有很多通俗易懂的讲解可供查找学习。
02 Executor
Executor是concurrent.futures模块的抽象类,但一般不直接调用,而是为线程池和进程池提供了一个父类,即ThreadPoolExecutor和ProcessPoolExecutor均继承自Executor。
Executor虽然不直接调用,但却提供了几个非常重要的接口供其子类继承
Executor的这几个方法中,submit()和map()也是ThreadPoolExecutor和ProcessPoolExecutor两个子类的常用方法。
另外,与Executor同在_base.py模块中定义的还有future类(调度并发任务后生成对象,用于获取单个任务信息)、wait()方法(其功能类似利用threading模块实现多线程时的join方法)等,具体不再展开。
03 ThreadPoolExecutor
ThreadPoolExecutor 是 Executor 的子类,即线程池对象类,用来异步执行调度并发任务。
def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()):
pass
其中max_workers是最主要和最常用的初始化参数,用于设置最大线程个数,默认为CPU个数乘以5,thread_name_prefix用于设置线程名前缀,后两个初始参数为3.7版本中增加,用于在每个任务初始化时调用一个可选对象,实际一般不用。
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
# used to overlap I/O instead of CPU work.
max_workers = (os.cpu_count() or 1) * 5
执行多线程任务有两种方式,都是继承自父类Executor中的方法,分别是submit()和map()
from concurrent.futures import ThreadPoolExecutor as executor
futures = [executor.submit(fun, arg) for arg in args]#方式1
results = executor.map(fun, args)#方式2
在使用submit执行多线程任务时,每个线程任务返回一个future对象,future对象是一个用于接收单个任务执行结果的对象,其result()方法常用于获取单任务执行结果,例如
futures = [executor.submit(fun, arg) for arg in args]#方式1
results = [future.result() for future in futures]
而在map执行方式中,则是直接返回单个任务执行结果的迭代器。
04 ThreadPoolExecutor
与ThreadPoolExecutor类似,ProcessPoolExecutor进程池也是继承自Executor类的一个子类,且很多调用接口和执行方式与前者几乎一致。
ProcessPoolExecutor官方文档内置配图
def __init__(self, max_workers=None, mp_context=None, initializer=None, initargs=()):
pass
if max_workers is None:
self._max_workers = os.cpu_count() or 1
这里,最大进程数默认为CPU核心个数。第二个参数与线程池类不同,是用于初始化一个多进程环境,默认调用multiporcessing模块的get_context方法。
05 并发实战对比
对python多线程和多进程并发任务有所了解的都知道,对于IO密集型任务(如涉及磁盘读写较多的任务、网络响应和传输较多的下载任务等),多线程和多进程都能带来较高的并发效率,但是对于计算密集型(CPU密集型)任务(涉及的任务主要是依赖CPU计算),则多线程一般不会带来效率上的提升,甚至与串行几乎一致。
下面通过两个实例验证这一结论,并测试并发效率
我们以python爬虫请求10次网页为例,分别测试串行、多线程和多进程3种方式的执行时间
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import math
from time import time
import requests
URLS = ['https://www.baidu.com/']*10
def get_baidu(url):
return requests.get(url).text
def multi_thread():
with ThreadPoolExecutor() as executor:
return list(executor.map(get_baidu, URLS))
def multi_process():
with ProcessPoolExecutor() as executor:
return list(executor.map(get_baidu, URLS))
def single():
return list(map(get_baidu, URLS))
if __name__ == '__main__':
start = time()
single()
print("time used by single computing :", time()-start)
start = time()
multi_thread()
print("time used by multi_thread computing :", time()-start)
start = time()
multi_process()
print("time used by multi_process computing :", time()-start)
"""
time used by single computing : 7.0965657234191895
time used by multi_thread computing : 0.41477227210998535
time used by multi_process computing : 1.7192769050598145
"""
这里,我们选用官方demo,即判断一个数是否是质数的案例。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import math
from time import time
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
pass#具体可参照官网或后台回复concurrent下载
def multi_thread():
with ThreadPoolExecutor() as executor:
return list(executor.map(is_prime, PRIMES))
def multi_process():
with ProcessPoolExecutor() as executor:
return list(executor.map(is_prime, PRIMES))
def single():
return list(map(is_prime, PRIMES))
if __name__ == '__main__':
start = time()
single()
print("time used by single computing :", time()-start)
start = time()
multi_thread()
print("time used by multi_thread computing :", time()-start)
start = time()
multi_process()
print("time used by multi_process computing :", time()-start)
"""
time used by single computing : 3.2942192554473877
time used by multi_thread computing : 3.2454559803009033
time used by multi_process computing : 2.2647616863250732
"""
注:以上两个详细源码可在 公众号:小数志 后台回复"concurrent"下载
串行调度计算密集型任务,CPU负载曲线(33%左右)
多线程调度计算密集型任务,CPU负载曲线(33%左右)
多进程调度计算密集型任务,CPU负载曲线(100%左右)
结论:
06 总结
concurrent模块主要类和方法关系图