前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python并发之concurrent快速入门

python并发之concurrent快速入门

作者头像
luanhz
发布2020-04-01 09:48:30
3.5K0
发布2020-04-01 09:48:30
举报
文章被收录于专栏:小数志

导读:我很笨,但是我很快——计算机之所以计算能力如此出众,不在于其有多智能,而是因为它超快的执行速度,而多核心则可以进一步成倍的提高效率。在python中,concurrent库就是用于完成并发的模块之一。

01 初识concurrent

concurrent库是python内置模块之一,基于threading和multiprocessing两个模块实现,并对二者进行了很好的封装和集成,使其拥有更加简洁易用的接口函数,无需再考虑start()、join()、lock()等问题。

打开concurrent模块(默认安装位于..\Python\Python37\Lib),发现当前其仅内置了一个futures子模块,而futures子模块中,则有3个重要的.py文件,其中_base.py是最主要的模块,提供了大部分并发功能,但属于私有模块,不能被其他程序直接import,另外两个则是process和thread模块,即多进程和多线程,二者均调用_base实现主要并发接口函数。

concurrent英文原义为"并发的",futures英文原义为"未来",模块取名concurrent很好理解(java中有同名包),而子模块取名futures则用以表示未来有待完成的任务,似乎也正体现了多线程/多进程中任务队列的含义。

注:关于多线程和多进程的理解和区别本文不予展开,网上有很多通俗易懂的讲解可供查找学习。

02 Executor

Executor是concurrent.futures模块的抽象类,但一般不直接调用,而是为线程池和进程池提供了一个父类,即ThreadPoolExecutor和ProcessPoolExecutor均继承自Executor。

Executor虽然不直接调用,但却提供了几个非常重要的接口供其子类继承

  • submit(fn, *args, **kwarg):用于调用并发任务,其中参数fn是执行任务的函数,通过fn(*args **kwargs)的形式执行单个任务,返回Future对象
  • map(func, *iterables, timeout=None, chunksize=1):类似于python全局函数map,将可迭代对象异步并行映射给func函数,并返回一个新的可迭代结果。其中可通过timeout设置允许最大单个任务的延时,chunksize用于在多进程中设置分组规模,在多线程中无意义
  • shutdown(wait=True),用于在任务完成后释放所调用的资源,其中wait参数默认为True,表示当前任务执行完毕且释放已分配资源后才返回,wait设置为False时则执行shutdown后立即返回,实际不怎么应用的到。后文将会提到,由于excutor支持上下文管理器with方法,所以可避免显式调用shutdown函数。

Executor的这几个方法中,submit()和map()也是ThreadPoolExecutor和ProcessPoolExecutor两个子类的常用方法。

另外,与Executor同在_base.py模块中定义的还有future类(调度并发任务后生成对象,用于获取单个任务信息)、wait()方法(其功能类似利用threading模块实现多线程时的join方法)等,具体不再展开。

03 ThreadPoolExecutor

ThreadPoolExecutor 是 Executor 的子类,即线程池对象类,用来异步执行调度并发任务。

  • 初始化
代码语言:javascript
复制
def __init__(self, max_workers=None, thread_name_prefix='', initializer=None, initargs=()):
    pass

其中max_workers是最主要和最常用的初始化参数,用于设置最大线程个数,默认为CPU个数乘以5,thread_name_prefix用于设置线程名前缀,后两个初始参数为3.7版本中增加,用于在每个任务初始化时调用一个可选对象,实际一般不用。

代码语言:javascript
复制
if max_workers is None:
    # Use this number because ThreadPoolExecutor is often
    # used to overlap I/O instead of CPU work.
    max_workers = (os.cpu_count() or 1) * 5
  • 执行多线程任务

执行多线程任务有两种方式,都是继承自父类Executor中的方法,分别是submit()和map()

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor as executor
futures = [executor.submit(fun, arg) for arg in args]#方式1
results = executor.map(fun, args)#方式2
  • 获取多线程调用结果

在使用submit执行多线程任务时,每个线程任务返回一个future对象,future对象是一个用于接收单个任务执行结果的对象,其result()方法常用于获取单任务执行结果,例如

代码语言:javascript
复制
futures = [executor.submit(fun, arg) for arg in args]#方式1
results = [future.result() for future in futures]

而在map执行方式中,则是直接返回单个任务执行结果的迭代器。

  • submit与map对比:二者均可用于执行线程池任务并返回结果,区别是后者直接返回执行结果;而前者返回一个future对象,在future对象中,除了可用其result()方法获得执行结果外,还有详细的方法来获取和设置任务状态,如
    • cancel():尝试取消调用
    • cancelled():如果调用被成功取消返回True
    • running():如果当前正在被执行不能被取消返回True
    • done():如果调用被成功取消或者完成running返回True

04 ThreadPoolExecutor

与ThreadPoolExecutor类似,ProcessPoolExecutor进程池也是继承自Executor类的一个子类,且很多调用接口和执行方式与前者几乎一致。

ProcessPoolExecutor官方文档内置配图

  • 初始化
代码语言:javascript
复制
def __init__(self, max_workers=None, mp_context=None, initializer=None, initargs=()):
    pass
    if max_workers is None:
        self._max_workers = os.cpu_count() or 1

这里,最大进程数默认为CPU核心个数。第二个参数与线程池类不同,是用于初始化一个多进程环境,默认调用multiporcessing模块的get_context方法。

  • 执行多进程任务:用submit或map方法,具体与多线程调用方式一致
  • 获取执行结果:与多线程获取结果方式一致

05 并发实战对比

对python多线程和多进程并发任务有所了解的都知道,对于IO密集型任务(如涉及磁盘读写较多的任务、网络响应和传输较多的下载任务等),多线程和多进程都能带来较高的并发效率,但是对于计算密集型(CPU密集型)任务(涉及的任务主要是依赖CPU计算),则多线程一般不会带来效率上的提升,甚至与串行几乎一致。

下面通过两个实例验证这一结论,并测试并发效率

  • IO密集型

我们以python爬虫请求10次网页为例,分别测试串行、多线程和多进程3种方式的执行时间

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import math
from time import time
import requests

URLS = ['https://www.baidu.com/']*10

def get_baidu(url):
    return requests.get(url).text

def multi_thread():
    with ThreadPoolExecutor() as executor:
        return list(executor.map(get_baidu, URLS))

def multi_process():
    with ProcessPoolExecutor() as executor:
        return list(executor.map(get_baidu, URLS))

def single():
    return list(map(get_baidu, URLS))

if __name__ == '__main__':
    start = time()
    single()
    print("time used by single computing :", time()-start)
    start = time()
    multi_thread()
    print("time used by multi_thread computing :", time()-start)
    start = time()
    multi_process()
    print("time used by multi_process computing :", time()-start)
"""
time used by single computing : 7.0965657234191895
time used by multi_thread computing : 0.41477227210998535
time used by multi_process computing : 1.7192769050598145
"""
  • 计算密集型

这里,我们选用官方demo,即判断一个数是否是质数的案例。

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import math
from time import time

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    pass#具体可参照官网或后台回复concurrent下载

def multi_thread():
    with ThreadPoolExecutor() as executor:
        return list(executor.map(is_prime, PRIMES))

def multi_process():
    with ProcessPoolExecutor() as executor:
        return list(executor.map(is_prime, PRIMES))

def single():
    return list(map(is_prime, PRIMES))

if __name__ == '__main__':
    start = time()
    single()
    print("time used by single computing :", time()-start)
    start = time()
    multi_thread()
    print("time used by multi_thread computing :", time()-start)
    start = time()
    multi_process()
    print("time used by multi_process computing :", time()-start)
"""
time used by single computing : 3.2942192554473877
time used by multi_thread computing : 3.2454559803009033
time used by multi_process computing : 2.2647616863250732
"""

注:以上两个详细源码可在 公众号:小数志 后台回复"concurrent"下载

串行调度计算密集型任务,CPU负载曲线(33%左右)

多线程调度计算密集型任务,CPU负载曲线(33%左右)

多进程调度计算密集型任务,CPU负载曲线(100%左右)

结论:

  • 对于IO密集型任务,多线程可发挥巨大威力,甚至执行效率超过了多进程执行方式(案例中多线程效率超过多进程的原因有二:一是进程间切换相较于线程切换带来更大开销和时延,二是默认初始化参数中多线程数量是CPU核心数的5倍,而多进程数量等于CPU核心数)
  • 对于计算密集型任务,多线程由于仅调用单个CPU进行计算,所以效率与串行几乎一致,而多进程由于可以调用多个CPU的计算能力,效率要更高一些。但由于进程间切换需要开销,故其与串行效率的比值达不到核心个数(经测试,数据量足够大时,效率比值接近CPU核心数)。

06 总结

concurrent模块主要类和方法关系图

  • python自带concurrent模块实现了对多线程threading模块和多进程multiprocessing模块的高度封装和集成,使用极为方便
  • ThreadPoolExecutor类和ProcessPoolExecutor类均继承自Executor父类,二者初始化方式略有区别,但调度并发任务和获取执行结果方式几乎一致
  • 2种调度并发任务的方式:submit()和map()
  • submit()相比map而言,具有更丰富的任务定制方法
  • IO密集型任务多线程和多进程均能带来较高执行效率,而计算密集型任务则仅多进程能带来实际提升
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-03-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小数志 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档