前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python 中的进程池与线程池 -- Future 与 Executor

python 中的进程池与线程池 -- Future 与 Executor

作者头像
用户3147702
发布2022-06-27 13:37:36
9410
发布2022-06-27 13:37:36
举报
文章被收录于专栏:小脑斧科技博客

1. 引言

上一篇文章中,我们介绍了 Python multiprocessing 包中提供的强大的进程池组件。 python 中的进程池 — multiprocessing.pool.Pool

说到并发编程,熟悉 java 的同学一定对 java 中简单易用的 Future 类设计十分了解,python 吸收了 java 的这一设计,也提供了 Future 类来提供便利的并发编程。 python 中 Future 最大的优势在于他将进程池、线程池与异步IO并发编程全部统一到同一套工具中,使用者只需要通过参数进行选择即可,极大地降低了使用者的学习成本与编程难度,本文我们就来详细介绍一下 python 中并发编程的重要组件 — 线程/进程池的使用。

2. Future 类

python3.4 在两个包里引入了 Future 类:

  1. concurrent.futures.Future — 用于实现进程池/线程池并发
  2. asyncio.Future — 用于实现基于异步 IO 的并发

Future 类的实例表示延迟任务本身,通过 Future 实例,我们就可以拿到异步执行的计算任务的返回。 通常情况下,我们不应主动创建 Future 实例,因为顾名思义,Future 对象表示未来需要做的事情,只有在排定排期后才应该被创建。 例如 concurrent.futures.Executor 类的 submit 方法会为传入的可调用方法排期,从而返回一个 Future 对象。 同样,我们也不应该去改变 Future 实例的对象,因为只有任务执行的过程才能够影响 Future 对象的状态。

2.1. Future 的类方法

2.1.1. 询问是否已完成 — done

done(self)

该方法立即返回一个布尔值,用来表示当前可调用对象是否已经完成调用。

2.2. 添加执行完成后的回调 — add_done_callback

add_done_callback(self, fn)

除了主动通过 done 方法轮询,我们也可以通过 add_done_callback 方法设置任务执行完成后自动调用的回调方法。

2.3. 获取任务执行结果 — result

result(self, timeout=0)

result 方法用来获取任务的执行结果,如果任务执行过程中抛出了异常,则在 result 方法被调用时会重新抛出该异常。 如果任务尚未返回,result 方法会阻塞等待,concurrent 包中的该方法提供了一个 timeout 参数,用来传递一个超时时间,如果在超时时间后仍未获取到结果,则会抛出 TimeoutError 异常,但 asyncio 包中的 Future 类的 result 方法则并未提供 timeout 参数。

3. 进程池与线程池 — Executor

上面我们提到了 Executor,我们不应该自己创建 Future 对象,而是应该通过 Executor 来生成。 concurrent 包中有两个类继承自 Executor,分别是:

  • ThreadPoolExecutor — 线程池
  • ProcessPoolExecutor — 进程池

他们分别维护了一个任务队列来控制并发编程,同时隐藏大量细节,让使用者在使用中足够简单。

3.1. 提交任务 — submit

submit(fn, args, *kwargs)

提交一个任务,返回一个 Future 对象用来接收执行结果。

3.1.1. 示例

下面的例子展示了将 15 次任务的执行提交给拥有 10 个进程的进程池,并获取返回。

代码语言:javascript
复制
import loggingimport osfrom concurrent.futures import *from time import sleepdef current_sleep(i):
    sleep(3)    return '%s.%s over sleep' % (i, os.getppid())if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')
    results = []    with ThreadPoolExecutor(10) as executor:
        futrues = []        for i in range(15):
            futrues.append(executor.submit(current_sleep, i+1))        for futrue in futrues:
            logging.info(futrue.result())

打印出了:

2019-07-02 15:02:49,758 - INFO: 1.16992 over sleep 2019-07-02 15:02:49,760 - INFO: 2.964 over sleep 2019-07-02 15:02:49,760 - INFO: 3.1688 over sleep 2019-07-02 15:02:49,761 - INFO: 4.20708 over sleep 2019-07-02 15:02:49,762 - INFO: 5.25720 over sleep 2019-07-02 15:02:49,762 - INFO: 6.3792 over sleep 2019-07-02 15:02:49,765 - INFO: 7.21468 over sleep 2019-07-02 15:02:49,765 - INFO: 8.16480 over sleep 2019-07-02 15:02:49,854 - INFO: 9.13940 over sleep 2019-07-02 15:02:50,694 - INFO: 10.24184 over sleep 2019-07-02 15:02:52,758 - INFO: 11.16992 over sleep 2019-07-02 15:02:52,761 - INFO: 12.1688 over sleep 2019-07-02 15:02:52,761 - INFO: 13.964 over sleep 2019-07-02 15:02:52,762 - INFO: 14.20708 over sleep 2019-07-02 15:02:52,762 - INFO: 15.25720 over sleep

可以看到,前 10 个进程在同时返回,此后其中五个进程执行任务并在 sleep 完成执行后返回。

3.2. 通过迭代器提交和返回任务 — map

map(func, *iterables, timeout=None)

map 方法与 multiprocessing.pool.Pool 中的 map 方法是一样的,将 iterable 参数传入的可迭代对象传递给不同的进程来处理,返回所有结果收集后的可迭代对象。 可以通过 timeout 参数限制任务执行的超时,一旦超时,则会触发 TimeoutError 异常。 如果任务执行过程中抛出了异常,map 方法并不会将异常抛出,只有在获取结果时才会抛出。

3.2.1. 示例

下面是通过 map 方法重新编写的上述示例。

代码语言:javascript
复制
import loggingimport osfrom concurrent.futures import *from time import sleepdef current_sleep(i):
    sleep(3)    return '%s.%s over sleep' % (i, os.getpid())if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')    with ProcessPoolExecutor(10) as executor:
        results = executor.map(current_sleep, range(1, 16))        for result in results:
            logging.info(result)

打印出了:

2019-07-02 15:11:17,526 - INFO: 1.15512 over sleep 2019-07-02 15:11:17,533 - INFO: 2.19212 over sleep 2019-07-02 15:11:17,538 - INFO: 3.6428 over sleep 2019-07-02 15:11:17,538 - INFO: 4.16516 over sleep 2019-07-02 15:11:17,542 - INFO: 5.6924 over sleep 2019-07-02 15:11:17,542 - INFO: 6.25212 over sleep 2019-07-02 15:11:17,542 - INFO: 7.22456 over sleep 2019-07-02 15:11:17,543 - INFO: 8.16536 over sleep 2019-07-02 15:11:17,623 - INFO: 9.10108 over sleep 2019-07-02 15:11:18,452 - INFO: 10.17748 over sleep 2019-07-02 15:11:20,526 - INFO: 11.15512 over sleep 2019-07-02 15:11:20,534 - INFO: 12.19212 over sleep 2019-07-02 15:11:20,539 - INFO: 13.6428 over sleep 2019-07-02 15:11:20,539 - INFO: 14.16516 over sleep 2019-07-02 15:11:20,541 - INFO: 15.22456 over sleep

虽然任务是在不同时间先后完成的,但只有当全部完成或超时才会返回。

3.3. 关闭进程/线程池 — shutdown

shutdown(wait=True)

关闭进程/线程池,此后进程/线程池不再接受 map 或 submit 调用,否则将触发 RuntimeError。 如果 wait 为 True,则阻塞等待进程/线程池关闭后返回,否则立即返回。

4. 多进程 vs 多线程

此前我们介绍了 Python 中的 GIL 锁,受此影响,Python 每一个时刻只能调度一个线程,这意味着并发并没有真的在进行。 而多进程则不同,多进程并发的模式中,由于进程间严格的隔离,他们得以真正的并行执行。 同时,Python 多进程让多核 CPU 得以被利用。

但相比多线程机制,多进程的模式也存在一些缺点和不足:

  1. 进程切换更为耗时
  2. 进程间通信相比线程间共享的数据更为复杂

因此,IO 密集型操作尽量使用 ThreadPoolExecutor 来执行,而对于 ProcessPoolExecutor,则只有在安装有多个 CPU 的高性能计算机上执行 CPU 密集型任务时,具有较大优势。

5. Executor vs threading/multiprocessing

ThreadPoolExecutor 与 ProcessPoolExecutor 分别实现了简单易用的线程池与进程池,但他们只是使用方法上的封装,底层仍然是通过调用 threading 与 multiprocessing 来实现的。 对于相对简单的模式,通过 Executor 即可完成,那么使用 threading/multiprocessing 就显得过于复杂,但很多情况下,我们需要进行线程同步、进程间通信等复杂的需求,此时就只能使用 threading/multiprocessing 来实现了。

6. 后记

在 python 中 Future 类被封装在两个包中:

  • concurrent.futures
  • asyncio

本文我们详细介绍了并发环境下,concurrent.futures 包中提供的进程池与线程池组件的用法,asyncio 包则实现了任务的异步执行,具体的使用方法敬请关注主页君的下一篇文章,谢谢。

7. 参考资料

《流畅的 python》。 https://docs.python.org/3.4/library/concurrent.futures.html#threadpoolexecutor-example。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-07-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 引言
  • 2. Future 类
    • 2.1. Future 的类方法
      • 2.1.1. 询问是否已完成 — done
    • 2.2. 添加执行完成后的回调 — add_done_callback
      • 2.3. 获取任务执行结果 — result
      • 3. 进程池与线程池 — Executor
        • 3.1. 提交任务 — submit
          • 3.1.1. 示例
        • 3.2. 通过迭代器提交和返回任务 — map
          • 3.2.1. 示例
        • 3.3. 关闭进程/线程池 — shutdown
        • 4. 多进程 vs 多线程
        • 5. Executor vs threading/multiprocessing
        • 6. 后记
        • 7. 参考资料
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档