进程:通常一个运行着的应用程序就是一个进程,比如:我启动了一个音乐播放器,现在它就是一个进程。线程:线程是进程的最小执行单元,比如:我在刚启动的音乐播放器上选了一首歌曲进行播放,这就是一个线程。
在多线程一文中,我们说了因为 GIL 的原因,CPython 解释器下的多线程牺牲了并行性,为此 Python 提供了多进程模块 multiprocessing
,该模块同时提供了本地和远程并发,使用子进程代替线程,可以有效的避免 GIL 带来的影响,能够充分发挥机器上的多核优势,可以实现真正的并行效果,并且它与 threading
模块的 API 基本类似,使用起来也比较方便。
multiprocessing
模块通过创建一个 Process
对象然后调用它的 start()
方法来生成进程,Process
与 threading.Thread
API 相同。
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
进程对象,表示在单独进程中运行的活动。参数说明如下:
multiprocessing.Process 对象具有如下方法和属性。
看一个使用多进程的示例。
from multiprocessing import Process
import time, os
def target():
time.sleep(2)
print ('子进程ID:', os.getpid())
if __name__=='__main__':
print ('主进程ID:', os.getpid())
ps = []
for i in range(10):
p = Process(target=target)
p.start()
ps.append(p)
for p in ps:
p.join()
当进程数量比较多时,我们可以利用进程池方便、高效的对进程进行使用和管理。
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
进程池对象。参数说明如下:
有如下两种方式向进程池提交任务:
import multiprocessing, time
def target(p):
print('t')
time.sleep(2)
print(p)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 5)
for i in range(3):
p = 'p%d'%(i)
# 阻塞式
pool.apply(target, (p, ))
# 非阻塞式
# pool.apply_async(target, (p, ))
pool.close()
pool.join()
multiprocessing.Pipe([duplex])
返回一对 Connection 对象 (conn1, conn2) , 分别表示管道的两端;如果 duplex 被置为 True (默认值),那么该管道是双向的,否则管道是单向的。
from multiprocessing import Pipe, Process
def setData(conn, data):
conn.send(data)
def printData(conn):
print(conn.recv())
if __name__ == "__main__":
data = '程序之间'
# 创建管道返回管道的两端
conn1, conn2 = Pipe()
p1 = Process(target=setData, args=(conn1, data,))
p2 = Process(target=printData, args=(conn2,))
p1.start()
p2.start()
p1.join()
p2.join()
multiprocessing.Queue([maxsize])
返回一个共享队列实例。具有如下方法:
from multiprocessing import Queue, Process
def setData(q, data):
q.put(data)
def printData(q):
print(q.get())
if __name__ == "__main__":
data = '程序之间'
q = Queue()
p1 = Process(target=setData, args=(q, data,))
p2 = Process(target=printData, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
多进程之间不共享数据,但共享同一套文件系统,像访问同一个文件、同一终端打印,如果不进行同步操作,就会出现错乱的现象。
所有在 threading 存在的同步方式,multiprocessing 中都有类似的等价物,如:锁、信号量等。以锁的方式为例,我们来看一个终端打印例子。
不加锁
from multiprocessing import Process
import os,time
def target():
print('p%s is start' %os.getpid())
time.sleep(2)
print('p%s is end' %os.getpid())
if __name__ == '__main__':
for i in range(3):
p=Process(target=target)
p.start()
执行结果:
p7996 is start
p10404 is start
p10744 is start
p7996 is end
p10404 is end
p10744 is end
加锁
from multiprocessing import Process, Lock
import os,time
def target(lock):
lock.acquire()
print('p%s is start' %os.getpid())
time.sleep(2)
print('p%s is end' %os.getpid())
lock.release()
if __name__ == '__main__':
lock = Lock()
for i in range(3):
p=Process(target=target, args=(lock,))
p.start()
执行结果:
p11064 is start
p11064 is end
p1532 is start
p1532 is end
p11620 is start
p11620 is end
并发编程时,通常尽量避免使用共享状态,但如果有一些数据确实需要在进程之间共享怎么办呢?对于这种情况,multiprocessing 模块提供了两种方式。
multiprocessing.Value(typecode_or_type, *args, lock=True)
返回一个从共享内存上创建的对象。参数说明如下:
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
从共享内存中申请并返回一个数组对象。
使用 Value 或 Array 将数据存储在共享内存映射中。
from multiprocessing import Process, Value, Array
def setData(n, a):
n.value = 1024
for i in range(len(a)):
a[i] = -a[i]
def printData(n, a):
print(num.value)
print(arr[:])
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(5))
print(num.value)
print(arr[:])
print('-----------------------')
p = Process(target=setData, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
由 Manager() 返回的管理器对象控制一个服务进程,该进程保存 Python 对象并允许其他进程使用代理操作它们。
Manager() 返回的管理器支持类型包括:list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue、Value 和 Array。
from multiprocessing import Process, Manager
def setData(d, l):
d[1] = '1'
d[0.5] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(5))
print(d)
print(l)
print('-----------------------')
p = Process(target=setData, args=(d, l))
p.start()
p.join()
print(d)
print(l)
参考:
https://docs.python.org/3/library/multiprocessing.html#using-a-pool-of-workers