我面临以下问题。我试图并行化一个更新文件的函数,但是由于有一个Pool()
,我无法启动OSError: [Errno 12] Cannot allocate memory
。我已经开始在服务器上查看了,而且我并不是在使用旧的、弱的/从实际内存中释放出来的。请参见htop
:
此外,free -m
还显示,除了~7GB的交换内存之外,我还有大量的内存可用:
我想处理的文件也没那么大。我将粘贴我的代码(和堆栈跟踪),在下面,大小如下:
所使用的predictionmatrix
数据帧占用约80 2MB,根据pandasdataframe.memory_usage()
,文件geo.geojson
为2MB。
我如何调试它呢?我能查到什么,怎么查?谢谢你的任何提示/技巧!
代码:
def parallelUpdateJSON(paramMatch, predictionmatrix, data):
for feature in data['features']:
currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch]
if (len(currentfeature) > 0):
feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}})
else:
feature['properties'].update({"style": {"opacity": 0}})
def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix):
with open('geo.geojson') as f:
data = json.load(f)
paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict)
pool = Pool()
func = partial(parallelUpdateJSON, paramMatch, predictionmatrix)
pool.map(func, data)
pool.close()
pool.join()
with open('output.geojson', 'w') as outfile:
json.dump(data, outfile)
堆栈跟踪:
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<ipython-input-428-d6121ed2750b> in <module>()
----> 1 writeGeoJSON(6, 15, baseline)
<ipython-input-427-973b7a5a8acc> in writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix)
14 print("Start loop")
15 paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict)
---> 16 pool = Pool(2)
17 func = partial(parallelUpdateJSON, paramMatch, predictionmatrix)
18 print(predictionmatrix.memory_usage())
/usr/lib/python3.5/multiprocessing/context.py in Pool(self, processes, initializer, initargs, maxtasksperchild)
116 from .pool import Pool
117 return Pool(processes, initializer, initargs, maxtasksperchild,
--> 118 context=self.get_context())
119
120 def RawValue(self, typecode_or_type, *args):
/usr/lib/python3.5/multiprocessing/pool.py in __init__(self, processes, initializer, initargs, maxtasksperchild, context)
166 self._processes = processes
167 self._pool = []
--> 168 self._repopulate_pool()
169
170 self._worker_handler = threading.Thread(
/usr/lib/python3.5/multiprocessing/pool.py in _repopulate_pool(self)
231 w.name = w.name.replace('Process', 'PoolWorker')
232 w.daemon = True
--> 233 w.start()
234 util.debug('added worker')
235
/usr/lib/python3.5/multiprocessing/process.py in start(self)
103 'daemonic processes are not allowed to have children'
104 _cleanup()
--> 105 self._popen = self._Popen(self)
106 self._sentinel = self._popen.sentinel
107 _children.add(self)
/usr/lib/python3.5/multiprocessing/context.py in _Popen(process_obj)
265 def _Popen(process_obj):
266 from .popen_fork import Popen
--> 267 return Popen(process_obj)
268
269 class SpawnProcess(process.BaseProcess):
/usr/lib/python3.5/multiprocessing/popen_fork.py in __init__(self, process_obj)
18 sys.stderr.flush()
19 self.returncode = None
---> 20 self._launch(process_obj)
21
22 def duplicate_for_child(self, fd):
/usr/lib/python3.5/multiprocessing/popen_fork.py in _launch(self, process_obj)
65 code = 1
66 parent_r, child_w = os.pipe()
---> 67 self.pid = os.fork()
68 if self.pid == 0:
69 try:
OSError: [Errno 12] Cannot allocate memory
更新
根据@robyschek的解决方案,我将代码更新为:
global g_predictionmatrix
def worker_init(predictionmatrix):
global g_predictionmatrix
g_predictionmatrix = predictionmatrix
def parallelUpdateJSON(paramMatch, data_item):
for feature in data_item['features']:
currentfeature = predictionmatrix[(predictionmatrix['SId']==feature['properties']['cellId']) & paramMatch]
if (len(currentfeature) > 0):
feature['properties'].update({"style": {"opacity": currentfeature.AllActivity.item()}})
else:
feature['properties'].update({"style": {"opacity": 0}})
def use_the_pool(data, paramMatch, predictionmatrix):
pool = Pool(initializer=worker_init, initargs=(predictionmatrix,))
func = partial(parallelUpdateJSON, paramMatch)
pool.map(func, data)
pool.close()
pool.join()
def writeGeoJSON(weekdaytopredict, hourtopredict, predictionmatrix):
with open('geo.geojson') as f:
data = json.load(f)
paramMatch = (predictionmatrix['Hour']==hourtopredict) & (predictionmatrix['Weekday']==weekdaytopredict)
use_the_pool(data, paramMatch, predictionmatrix)
with open('trentino-grid.geojson', 'w') as outfile:
json.dump(data, outfile)
我还是会犯同样的错误。而且,根据文档的说法,map()
应该把我的data
分成几个块,所以我不认为它应该复制我的80MBsrownum时间。但我可能错了..。)另外,我注意到,如果我使用较小的输入(~11 of而不是80 of),我就不会得到错误。所以我想我试图使用太多的内存,但我无法想象它是如何从80 to变成16 to内存无法处理的。
发布于 2017-08-10 09:56:53
我们有过几次。根据我的sys管理员,在unix中有一个“bug”,如果您的进程达到了最大文件描述符的限制,这将引发相同的错误。
文件描述符泄漏,引发的错误是Errno 12不能分配memory#012OSError。
因此,您应该查看您的脚本,并再次检查问题是否不是创建过多的FD,而是
发布于 2017-03-03 12:31:31
当使用multiprocessing.Pool
时,启动进程的默认方式是fork
。fork
的问题是整个过程是重复的。(详见此处)。因此,如果您的主进程已经使用了大量内存,那么这个内存将被复制,到达这个MemoryError
。例如,如果主进程使用内存的2GB
,而使用8个子进程,则需要内存中的18GB
。
您应该尝试使用不同的开始方法,例如'forkserver'
或'spawn'
。
from multiprocessing import set_start_method, Pool
set_start_method('forkserver')
# You can then start your Pool without each process
# cloning your entire memory
pool = Pool()
func = partial(parallelUpdateJSON, paramMatch, predictionmatrix)
pool.map(func, data)
这些方法避免重复您的Process
的工作空间,但是由于您需要重新加载正在使用的模块,启动速度可能会慢一些。
https://stackoverflow.com/questions/42584525
复制相似问题