上一篇文章中,我们介绍了如何通过 multiprocessing 进行多进程并发编程。 通过 multiprocessing 实现 python 多进程
本文,我们来介绍一下 multiprocessing 中提供的进程池组件 — Pool。
通过上一篇文章中所介绍的 multiprocessing 开辟进程的方式来实现并发编程存在下面的几个问题:
解决上述问题最简单的方式就是池化执行,由进程池来管理并复用若干个进程,就可以解决上述的所有问题,既限制了同时最大的并发进程数,也避免了反复开辟与回收的资源浪费,保证了最大的资源利用效率。 multiprocessing 提供了进程池组件 — Pool,让我们方便的创建一个进程池。
multiprocessing.pool.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)
进程池最重要的就是使用了,但需要注意的是,所有下面这些方法都必须由创建进程池的进程调用。
apply(func, args=None, kwds=None)
同步执行函数 func。
import loggingimport osfrom multiprocessing.pool import Poolfrom time import sleepdef f():
logging.info('f %s' % os.getpid())
sleep(1)if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s') with Pool(4) as pool:
pool.apply(f)
pool.apply(f)
pool.apply(f)
pool.apply(f)
pool.apply(f)
打印出了:
2019-06-26 16:25:20,028 - INFO: f 198 2019-06-26 16:25:21,035 - INFO: f 199 2019-06-26 16:25:22,040 - INFO: f 200 2019-06-26 16:25:23,043 - INFO: f 201 2019-06-26 16:25:24,045 - INFO: f 198
可以看到,前4次调用 f 依次使用了进程池中的四个不同的进程,最后一次调用复用了第一个进程,每次调用都等待了 1 秒钟。
apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None)
上面的例子中,每次调用都间隔了 1 秒钟,没有实现真正的并发,所以我们需要异步执行所有的调用。 apply_async 就是 apply 的异步版本。 参数与 apply 大体相同,增加了可选的执行完成后自动调用的回调方法参数。
import loggingimport osimport timefrom multiprocessing.pool import Poolfrom time import sleepdef f():
sleep(1) return '%s finish f_call at %s' % (os.getpid(), time.strftime('%Y-%m-%d %H:%M:%S'))if __name__ == '__main__':
result = [] with Pool(4) as pool:
result.append(pool.apply_async(f))
result.append(pool.apply_async(f))
result.append(pool.apply_async(f))
result.append(pool.apply_async(f))
result.append(pool.apply_async(f)) for res in result:
logging.info(res.get(timeout=10))
打印出了:
2019-06-26 18:26:10,563 - INFO: 7212 finish f_call at 2019-06-26 18:26:10 2019-06-26 18:26:10,563 - INFO: 5440 finish f_call at 2019-06-26 18:26:10 2019-06-26 18:26:10,672 - INFO: 7044 finish f_call at 2019-06-26 18:26:10 2019-06-26 18:26:11,577 - INFO: 7212 finish f_call at 2019-06-26 18:26:11 2019-06-26 18:26:11,577 - INFO: 5440 finish f_call at 2019-06-26 18:26:11
通过异步,我们做到了真正的并发调用。
map(func, iterable, chunksize=0)
与 Python 标准库中的 map 方法有着相同的用法和功能,不同的是,进程池中的该方法会将 iterable 参数传入的可迭代对象分成 chunksize 份传递给不同的进程来处理。
import loggingimport osfrom multiprocessing.pool import Poolfrom time import sleepimport timedef f(i):
print('%s get %s at %s' % (os.getpid(), i, time.strftime('%Y-%m-%d %H:%M:%S')))
sleep(1)if __name__ == '__main__': with Pool(4) as pool:
pool.map(f, range(10))
打印出了:
107 get 0 at 2019-06-27 19:50:30 109 get 2 at 2019-06-27 19:50:30 108 get 1 at 2019-06-27 19:50:30 110 get 3 at 2019-06-27 19:50:30 107 get 4 at 2019-06-27 19:50:31 109 get 5 at 2019-06-27 19:50:31 108 get 6 at 2019-06-27 19:50:31 110 get 7 at 2019-06-27 19:50:31 107 get 8 at 2019-06-27 19:50:32 109 get 9 at 2019-06-27 19:50:32
可以看到,进程池中的四个进程在同一时刻实现了并发调用,随后并发等待1秒后进行下一轮并发调用。 与 apply 的同步调用相比,性能有了很大幅度的提升了。
map_async(func, iterable, chunksize=0, callback=None, error_callback=None)
与 apply_async 类似,map_async 是 map 的异步版本,我们可以通过他返回的对象的阻塞调用 get 方法来获取进程执行后的结果,与 apply_async 不同的是,map_async 会先收集多个进程的运行结果后返回。
imap(func, iterable, chunksize=0)
有时,我们调用 map 传入的可迭代对象的可迭代次数会非常多,如果通过 map 来进行任务的分配和回收,显然会因为计算量过大而出现过度耗时的情况。 此前的文章中,我们介绍过生成器与协程,是否可以借助协程的思想通过迭代器与 next 方法逐步获取结果呢?python 进程池已经考虑到这一情况,并引入了 imap 方法,来返回一个迭代器,通过 next 方法逐步拿到其运行结果。 imap 方法与 map 方法在参数上是一模一样的,不同之处仅在于其返回的结果。 他返回的结果对象是一个迭代器,可以通过向标准库 next 方法传入该迭代器来迭代结果,也可以调用迭代器本身提供的 next 方法来获取结果,值得一提,迭代器本身提供的 next 方法允许传入一个整数的 timeout 参数作为最大超时。
import loggingimport osfrom multiprocessing.pool import Poolfrom time import sleepimport timedef f(i):
sleep(1) return '%s finsh sleep by %s at %s' % (os.getpid(), i, time.strftime('%Y-%m-%d %H:%M:%S'))if __name__ == '__main__': with Pool(4) as pool:
it = pool.imap(f, range(10))
res = next(it) while res:
print(res)
res = next(it)
打印出了:
181 get 0 at 2019-06-27 20:02:13 183 get 2 at 2019-06-27 20:02:13 182 get 1 at 2019-06-27 20:02:13 184 get 3 at 2019-06-27 20:02:13 181 get 4 at 2019-06-27 20:02:14 181 finsh sleep by 0 at 2019-06-27 20:02:14 183 get 5 at 2019-06-27 20:02:14 182 get 6 at 2019-06-27 20:02:14 184 get 7 at 2019-06-27 20:02:14 182 finsh sleep by 1 at 2019-06-27 20:02:14 183 finsh sleep by 2 at 2019-06-27 20:02:14 184 finsh sleep by 3 at 2019-06-27 20:02:14 181 get 8 at 2019-06-27 20:02:15 181 finsh sleep by 4 at 2019-06-27 20:02:15 183 get 9 at 2019-06-27 20:02:15 183 finsh sleep by 5 at 2019-06-27 20:02:15 182 finsh sleep by 6 at 2019-06-27 20:02:15 184 finsh sleep by 7 at 2019-06-27 20:02:15 181 finsh sleep by 8 at 2019-06-27 20:02:16 183 finsh sleep by 9 at 2019-06-27 20:02:16 Traceback (most recent call last): File "/usr/lib/python3.6/multiprocessing/pool.py", line 720, in next item = self._items.popleft() IndexError: pop from an empty deque During handling of the above exception, another exception occurred: Traceback (most recent call last): File "x.py", line 21, in <module> res = next(it) File "/usr/lib/python3.6/multiprocessing/pool.py", line 723, in next raise StopIteration StopIteration
我们看到,每一轮调用都是 next 方法触发的,和我们曾经介绍过的一样,最终迭代器的最后一次 next 调用会抛出 StopIteration 异常。
正如我们可以给进程发送 SIGINT 与 SIGTERM 两种信号来关闭进程或强制终止进程,进程池也提供了两种终止的方法。
close()
等待进程池中所有已分配任务执行结束后退出。
terminate()
立即强制中止所有进程并退出。
join()
进程池同时也提供了 join 方法,用来阻塞等待直到进程池中所有进程均执行结束。
multiprocessing 中进程池的使用,与我们上一篇文章中讲述的 multiprocessing 子进程创建并执行并发请求从本质上与风格上都是一致的,只是对我们的程序编写来说简化了大量的管理与操作的代码,让我们将目光完全集中于实际任务的编写。 使用过 java 的 future 对象的同学一定会觉得 multiprocessing 提供了这么多不同类型的执行方法让人有些难以选择,而隐藏了具体细节的 Future 则显得更加抽象和易用。 python 的设计也参考了 java 中的设计,实现了 Futrue 对象,同时统一了进程池与线程池的用法,敬请期待下一篇文章我们的详细介绍。