此前我们介绍了 python 中的多进程包 multiprocessing 以及 signal 包提供的最基本的进程间通信方式 — 信号。 通过 multiprocessing 实现 python 多进程 python 进程间通信(一) — 信号的基本使用 python 进程间通信(二) — 定时信号 SIGALRM
本文,我们来接着介绍 python 中的其他的进程间通信方式 — 进程同步原语及管道与队列。
此前,我们已经介绍了 threading 包中封装的一系列线程同步原语: Python 线程同步(一) — 竞争条件与线程锁 python 线程同步(二) — 条件对象 python 线程同步(三) — 信号量 python 线程同步(四) — 事件对象与栅栏
所有上述这些同步原语在 multiprocessing 包中都有对应的封装,并且有着一模一样的用法,这里我们就不再赘述了。
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
使用多进程时,一般使用消息机制实现进程间通信,尽可能避免使用上面所说的同步原语。 因为锁的粒度难以控制,同时,不可避免的对执行效率产生影响,通信参与方也不那么显而易见。 相比之下,通过消息机制实现进程间通信则要简单明了的多。
python 中提供了两种基于消息的进程间通信方式:
multiprocessing.Pipe(duplex=True)
Pipe 是一个由管道连接的双向通信对象。 构造参数 duplex 指定该管道是否是双向的,并返回一个由两个连接对象构成的元组。 两个连接对象具有 recv 方法和 send 方法,分别用来接收和发送数据,两个不同的进程可以分别使用两个连接对象来发送和接收数据。 但需要注意的是,如果两个不同的进程尝试同时读或写同一个连接对象,则管道中的数据可能会被损坏。
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv())
p.join()
上述代码我们创建了一个管道,并让父进程和子进程分别持有两个管道创建返回的连接对象来进程通信。 运行程序,打印出了:
[42, None, ’hello’]
multiprocessing 包中的 Queue 类是数据结构中 FIFO 队列的实现,包含下面三种实现:
Queue 和 JoinableQueue 以及所有 Queue 的子类,都依赖操作系统实现信号量,如果操作系统没有信号量的实现,则在实例化一个队列时,会抛出 ImportError。
下面我们详细来介绍一下。
Queue 是通过一个管道实现的 FIFO 进程间通信结构,它实现了标准库 queue.Queue 除 task_done 和 join 之外的所有方法。 他是通过一个后台线程将用户放入或取出数据的请求传递给管道,受此异步实现的影响,所有判断队列中元素数量的方法,包括判断队列是否已满或是否为空的方法返回的数值可能都是不准确的。
__init__(self, maxsize=0, *, ctx)
构造方法传入一个数字,用来标识队列的最大容量,默认的 0 表示不限容量。
qsize()
返回队列当前元素数,很多类 Unix 环境中调用该方法会抛出 NotImplementedError,因为他们没有实现 sem_getvalue 方法。 由于具体实现中,向队列中放入元素和信号量值的实际变化是异步的,所以这个方法的返回并不十分准确。
有两个方法可以向队列插入元素;
put 方法将元素 obj 放入队列,同时可以指定这一过程是否需要阻塞等待以及超时,如果非阻塞且队列已满或超时后,会抛出 queue.Full。 put_nowait 相当于 put(obj, False)
与插入元素的两个方法一样,获取元素也有两个方法:
get 方法从队列中取出一个元素,如果非阻塞且队列为空或超时后,会抛出 queue.Empty。 get_nowait 相当于 get(False)
empty()
返回 bool 值,队列为空则返回 True。
full()
返回 bool 值,队列已满则返回 True。
close()
close() 方法一旦调用,则队列不允许再放入任何数据,当所有数据都被刷入管道,后台线程就会退出。
join_thread()
等待后台进程刷新数据,只有调用过 close 方法以后才可以调用该方法。 调用该方法后,进程会一直阻塞,直到所有数据都被刷入队列中的管道,队列的后台线程退出。
cancel_join_thread()
默认的,进程在向队列插入数据后退出,都会自动等待后台线程刷新 pipe。 在此之前,通过 cancel_join_thread 可以改变这一默认的行为,即使进程主动调用 join_thread 方法,也会立即返回,作为代价,数据可能并没有被真的写入管道中从而导致数据的丢失。
这是一个简单而通用的队列实现,他可以被认为是一个加锁的管道,与另两个 multiprocessing 提供的队列不同,他可以在没有实现信号量的操作系统中使用。 他实现的方法非常少,只有下面三个:
JoinableQueue 是 Queue 的一个子类,他与 Queue 的区别在于它实现了 task_done 和 join 方法。
__init__(self, maxsize=0, *, ctx)
与 Queue 一样,JoinableQueue 的构造方法也支持传入一个数值,用于指定队列的最大容量,为 0 则不限制容量。
task_done()
task_done 方法标识最近一次 get 得到的元素已经被处理,则下次 get 方法将返回新数据。
join()
阻塞,直到队列中所有元素都被接收且被 task_done 方法标识为处理完毕。
我们将上面的管道的例子改为使用队列:
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get())
p.join()
打印出了:
[42, None, ’hello’]