我有一个60 5+的SciPy阵列(矩阵),必须在5+ multiprocessing
Process
对象之间共享。我在SciPy列表上看过numpy-sharedmem和this discussion。似乎有两种方法--numpy-sharedmem
和使用multiprocessing.RawArray()
以及将NumPy dtype
s映射到ctype
s。现在,numpy-sharedmem
似乎是可行的方法,但我还没有看到一个好的参考示例。我不需要任何类型的锁,因为数组(实际上是一个矩阵)将是只读的。现在,由于它的大小,我想避免复制。听起来正确的方法是创建数组的唯一副本作为sharedmem
数组,然后将其传递给Process
对象?有几个具体的问题:
Process()
es的最佳方式是什么?我需要一个队列来传递一个数组吗?一根烟斗会更好吗?我能不能把它作为参数传递给Process()
子类的初始化(我假设它是pickled的)?numpy-sharedmem
不是64位安全的?我肯定在使用一些不是32位addressable.RawArray()
方法上有什么折衷?更慢,更麻烦?如果我可以提供任何其他信息来帮助其他人澄清这一点,请发表评论,我会添加的。谢谢!
它需要在Ubuntu Linux甚至Mac OS上运行,但可移植性并不是一个大问题。
发布于 2013-07-30 23:54:18
@Velimir给出了一个很好的答案。我想我可以添加一些注释和一个小示例。
(我在sharedmem上找不到太多文档--这些是我自己的实验结果。)
Process
使用target
和args
参数。这可能比使用全局变量更好。示例
#!/usr/bin/env python
from multiprocessing import Process
import sharedmem
import numpy
def do_work(data, start):
data[start] = 0;
def split_work(num):
n = 20
width = n/num
shared = sharedmem.empty(n)
shared[:] = numpy.random.rand(1, n)[0]
print "values are %s" % shared
processes = [Process(target=do_work, args=(shared, i*width)) for i in xrange(num)]
for p in processes:
p.start()
for p in processes:
p.join()
print "values are %s" % shared
print "type is %s" % type(shared[0])
if __name__ == '__main__':
split_work(4)
输出
values are [ 0.81397784 0.59667692 0.10761908 0.6736734 0.46349645 0.98340718
0.44056863 0.10701816 0.67167752 0.29158274 0.22242552 0.14273156
0.34912309 0.43812636 0.58484507 0.81697513 0.57758441 0.4284959
0.7292129 0.06063283]
values are [ 0. 0.59667692 0.10761908 0.6736734 0.46349645 0.
0.44056863 0.10701816 0.67167752 0.29158274 0. 0.14273156
0.34912309 0.43812636 0.58484507 0. 0.57758441 0.4284959
0.7292129 0.06063283]
type is <type 'numpy.float64'>
此related question可能会很有用。
发布于 2013-07-22 19:29:15
如果您使用的是Linux (或任何兼容POSIX的系统),则可以将此数组定义为全局变量。当multiprocessing
启动一个新的子进程时,它正在使用Linux上的fork()
。新产生的子进程会自动与其父进程共享内存,只要它不更改内存(copy-on-write机制)。
既然您说“我不需要任何类型的锁,因为数组(实际上是一个矩阵)将是只读的”,利用这种行为将是一种非常简单但非常有效的方法:所有子进程在读取这个大型的numpy数组时将访问物理内存中的相同数据。
不要将您的数组交给Process()
构造函数,这将指示multiprocessing
将数据pickle
到子对象,在您的情况下,这将是非常低效或不可能的。在Linux上,在fork()
之后,子元素是父元素的完全副本,使用相同的物理内存,所以您需要做的就是确保可以从您移交给Process()
的target
函数中访问“包含”矩阵的Python变量。这通常可以通过一个“全局”变量来实现。
示例代码:
from multiprocessing import Process
from numpy import random
global_array = random.random(10**4)
def child():
print sum(global_array)
def main():
processes = [Process(target=child) for _ in xrange(10)]
for p in processes:
p.start()
for p in processes:
p.join()
if __name__ == "__main__":
main()
在不支持fork()
的Windows上,multiprocessing
使用win32应用编程接口调用CreateProcess
。它从任何给定的可执行文件创建一个全新的进程。这就是为什么在Windows上,如果需要在父进程运行时创建的数据,则需要将数据保留到子进程。
发布于 2013-07-29 05:45:22
您可能会对我编写的一小段代码感兴趣:github.com/vmlaker/benchmark-sharedmem
唯一感兴趣的文件是main.py
。这是numpy-sharedmem的一个基准测试--代码只是通过管道将数组( numpy
或sharedmem
)传递给衍生的进程。工作人员只需对数据调用sum()
。我只对比较两种实现之间的数据通信时间感兴趣。
我还编写了另一个更复杂的代码:github.com/vmlaker/sherlock。
在这里,我使用numpy-sharedmem模块通过OpenCV进行实时图像处理--图像是NumPy数组,根据OpenCV的更新的cv2
应用编程接口。图像,实际上是其引用,通过从multiprocessing.Manager
创建的字典对象在进程之间共享(与使用队列或管道相反)。与使用普通的NumPy数组相比,我获得了很大的性能改进。
管道与队列
根据我的经验,使用管道的IPC比使用队列的IPC更快。这是有道理的,因为队列增加了锁定,使其对多个生产者/消费者来说是安全的。管道不需要,但是如果你只有两个进程来回通信,那么使用管道是安全的,或者,正如文档所说:
...同时使用管道的不同末端的进程不会有损坏的风险。
sharedmem
安全
sharedmem
模块的主要问题是在不正常的程序退出时可能发生内存泄漏。在冗长的讨论here中对此进行了描述。虽然在2011年4月10日Sturla提到了内存泄漏的修复,但从那时起,我仍然经历了泄漏,使用了两个repos,Sturla Molden自己在GitHub (github.com/sturlamolden/sharedmem-numpy)上的和Chris Lee-Messer在Bitbucket (bitbucket.org/cleemesser/numpy-sharedmem)上的。
https://stackoverflow.com/questions/17785275
复制相似问题