此前一系列文章中,我们介绍了 Python 的threading 包中的一系列工具。 python 的线程 Python 线程同步(一) — 竞争条件与线程锁 python 线程同步(二) — 条件对象 python 线程同步(三) — 信号量 python 线程同步(四) — 事件对象与栅栏
threading 包为 Python 提供了线程模型,而 multiprocessing 包则为另一种并发模型 — 多进程模型提供了强大的解决方案。 multiprocessing 与 threading 十分相似,他提供了基本的进程对象类以及功能强大的进程同步工具,同时,multiprocessing 还提供了进程池的封装类 Pool。
此前我们介绍了 Python 中的 GIL 锁,受此影响,Python 每一个时刻只能调度一个线程,这意味着并发并没有真的在进行。 而多进程则不同,多进程并发的模式中,由于进程间严格的隔离,他们得以真正的并行执行。 同时,Python 多进程让多核 CPU 得以被利用。
但相比多线程机制,多进程的模式也存在一些缺点和不足:
方法 | 描述 |
---|---|
active_children | 返回当前进程存活的子进程的列表 |
cpu_count | 返回系统的 CPU 数量,但并不是当前进程可用的数量,len(os.sched_getaffinity(0)) 方法获取的是当前进程可用的数量 |
current_process | 获取当前进程的 Process 对象 |
get_all_start_methods | 返回支持的启动方法的列表,该列表的首项即为默认选项,包括我们后面即将要介绍的 ’fork’, ’spawn’ 和 ’forkserver’ |
get_context | 返回进程上下文 Context 对象 |
get_start_method | 获取当前启动进程的启动方法,’fork’ , ’spawn’ , ’forkserver’ 或者 None(如果没有设置) |
set_executable | 设置在启动子进程时使用的 Python 解释器路径,例如:set_executable(os.path.join(sys.exec_prefix, ’pythonw.exe’)) |
set_start_method | 设置启动子进程的方法 ’fork’ , ’spawn’ 或者 ’forkserver’ |
你会发现 Process 类与 Thread 类十分相似,他们都通过 start 方法启动并开始执行 run 方法的内容,同时,join 用来阻塞等待某个进程完成执行。 但是不同的是,这些方法只能由被调用进程的父进程来调用。
需要注意的是,正如我们上面所说,start() 、 join() 、 is_alive() 、 terminate() 和 exitcode 方法只能由创建进程对象的进程调用。
import logging
from multiprocessing import Process
from time import sleep, ctime
class myProcess(Process):
def __init__(self, nsec):
super().__init__()
self.nsec = nsec
def run(self):
logging.info('start_sleep[%s]' % self.nsec)
sleep(self.nsec + 1)
logging.info('end_sleep[%s]' % self.nsec)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')
logging.info('start at %s'% ctime())
processes = list()
for i in range(5):
t = myProcess(i)
processes.append(t)
for process in processes:
process.start()
for process in processes:
process.join()
logging.info('end at %s' % ctime())
打印出了:
2019-05-22 10:02:24,705 - INFO: start at Wed May 22 10:02:24 2019 2019-05-22 10:02:24,739 - INFO: start_sleep[2] 2019-05-22 10:02:24,740 - INFO: start_sleep[1] 2019-05-22 10:02:24,742 - INFO: start_sleep[4] 2019-05-22 10:02:24,743 - INFO: start_sleep[0] 2019-05-22 10:02:24,743 - INFO: start_sleep[3] 2019-05-22 10:02:25,745 - INFO: end_sleep[0] 2019-05-22 10:02:26,746 - INFO: end_sleep[1] 2019-05-22 10:02:27,747 - INFO: end_sleep[2] 2019-05-22 10:02:28,748 - INFO: end_sleep[3] 2019-05-22 10:02:29,749 - INFO: end_sleep[4] 2019-05-22 10:02:29,750 - INFO: end at Wed May 22 10:02:29 2019
import logging
from multiprocessing import Process
from time import sleep, ctime
def sleep_func(pindex):
logging.info('start_sleep[%s]' % pindex)
sleep(pindex + 1)
logging.info('end_sleep[%s]' % pindex)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')
logging.info('start at %s' % ctime())
processes = list()
for i in range(5):
process = Process(target=sleep_func, args=[i])
processes.append(process)
for process in processes:
process.start()
for process in processes:
process.join()
logging.info('end at %s' % ctime())
打印出了:
2019-05-22 10:07:30,340 - INFO: start at Wed May 22 10:07:30 2019 2019-05-22 10:07:30,346 - INFO: start_sleep[2] 2019-05-22 10:07:30,348 - INFO: start_sleep[3] 2019-05-22 10:07:30,350 - INFO: start_sleep[0] 2019-05-22 10:07:30,351 - INFO: start_sleep[4] 2019-05-22 10:07:30,349 - INFO: start_sleep[1] 2019-05-22 10:07:31,353 - INFO: end_sleep[0] 2019-05-22 10:07:32,353 - INFO: end_sleep[1] 2019-05-22 10:07:33,354 - INFO: end_sleep[2] 2019-05-22 10:07:34,353 - INFO: end_sleep[3] 2019-05-22 10:07:35,353 - INFO: end_sleep[4] 2019-05-22 10:07:35,354 - INFO: end at Wed May 22 10:07:35 2019
可以看到,multiprocessing 的用法与 threading 中的用法简直是一模一样。
根据不同的平台,multiprocessing 有三种启动进程的方法:
通过 multiprocessing.set_start_method 方法,可以设置不同的启动方法:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
需要注意的是,在程序中 set_start_method() 不应该被多次调用,不同上下文启动的进程可能是不兼容的,比如使用 fork 上下文创建的锁不能传递给使用 spawn 或 forkserver 启动方法启动的进程。
下面我们来对比一下多进程、多线程运行 CPU 密集型任务的耗时情况:
import time
from multiprocessing import Process
from threading import Thread
def count(x, y):
# 使程序完成50万计算
c = 0
while c < 500000:
c += 1
x += x
y += y
if __name__ == '__main__':
t = time.time()
for x in range(10):
count(1, 1)
print("Line", time.time() - t)
counts = []
t = time.time()
for x in range(10):
thread = Thread(target=count, args=(1, 1))
counts.append(thread)
thread.start()
for thread in counts:
thread.join()
print("Threading", time.time() - t)
counts = []
t = time.time()
for x in range(10):
process = Process(target=count, args=(1, 1))
counts.append(process)
process.start()
for process in counts:
process.join()
print("Multiprocess", time.time() - t)
程序非常简单,我们分别进行 50 万次计算,得到结果如下:
Line 69.52206325531006 Threading 55.799378633499146 Multiprocess 44.240989685058594
多进程运行确实有着性能优势,但也没有我们想象中那么大。