众所周知,pathos.multiprocessing优于Python中的multiprocessing库,因为前者使用dill而不是pickle,并且可以序列化范围更广的函数和其他东西。
但是,当涉及到使用pool.map()按行编写pathos结果时,就会遇到一些麻烦。如果ProcessPool中的所有进程都将结果按行写入单个文件,则它们会相互干扰,同时写入一些行并破坏作业。在使用普通的multiprocessing包时,我能够让进程写入它们自己的单独文件(用当前进程id命名),如下所示:
example_data = range(100)
def process_point(point):
output = "output-%d.gz" % mpp.current_process().pid
with gzip.open(output, "a+") as fout:
fout.write('%d\n' % point**2)然后,这段代码运行良好:
import multiprocessing as mpp
pool = mpp.Pool(8)
pool.map(process_point, example_data)但这段代码并没有:
from pathos import multiprocessing as mpp
pool = mpp.Pool(8)
pool.map(process_point, example_data)并抛出AttributeError
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-10-a6fb174ec9a5> in <module>()
----> 1 pool.map(process_point, example_data)
/usr/local/lib/python2.7/dist-packages/processing-0.52_pathos-py2.7-linux-x86_64.egg/processing/pool.pyc in map(self, func, iterable, chunksize)
128 '''
129 assert self._state == RUN
--> 130 return self.mapAsync(func, iterable, chunksize).get()
131
132 def imap(self, func, iterable, chunksize=1):
/usr/local/lib/python2.7/dist-packages/processing-0.52_pathos-py2.7-linux-x86_64.egg/processing/pool.pyc in get(self, timeout)
371 return self._value
372 else:
--> 373 raise self._value
374
375 def _set(self, i, obj):
AttributeError: 'module' object has no attribute 'current_process'在current_process()中没有pathos,我找不到类似于它的任何东西。有什么想法吗?
发布于 2015-09-14 17:40:22
我是pathos的作者。虽然您的答案适用于这种情况,但最好在multiprocessing中使用pathos中的分叉,这是在相当迟钝的位置:pathos.helpers.mp中找到的。
这为您提供了与multiprocessing的一对一映射,但具有更好的序列化。因此,您将使用pathos.helpers.mp.current_process。
对不起,这既没有文件也没有明显的…我至少应该改进这两个问题中的一个。
发布于 2015-09-14 16:01:17
这个简单的技巧似乎起了作用:
import multiprocessing as mp
from pathos import multiprocessing as pathos_mp
import gzip
example_data = range(100)
def process_point(point):
output = "output-%d.gz" % mp.current_process().pid
with gzip.open(output, "a+") as fout:
fout.write('%d\n' % point**2)
pool = pathos_mp.Pool(8)
pool.map(process_point, example_data)换句话说,我们可以使用pathos进行并行计算,使用普通的multiprocessing包获取当前进程的id,这将正确工作!
https://stackoverflow.com/questions/32568514
复制相似问题