我试图迭代100,000多个图像,捕获一些图像特性,并将产生的dataFrame存储在磁盘上,作为一个泡菜文件。
不幸的是,由于RAM的限制,我不得不将图像分割成20,000块,并在将结果保存到磁盘之前对它们执行操作。
下面编写的代码应该在开始循环之前保存20,000个图像的结果数据,以处理接下来的20,000幅图像。
然而,这似乎并没有解决我的问题,因为内存在第一个for循环结束时没有从RAM中释放出来。
所以在处理第5万条记录的某个地方,程序会因为内存不足而崩溃。
在将对象保存到磁盘并调用垃圾收集器之后,我尝试删除这些对象,但是RAM的使用似乎并没有下降。
我遗漏了什么?
#file_list_1 contains 100,000 images
file_list_chunks = list(divide_chunks(file_list_1,20000))
for count,f in enumerate(file_list_chunks):
# make the Pool of workers
pool = ThreadPool(64)
results = pool.map(get_image_features,f)
# close the pool and wait for the work to finish
list_a, list_b = zip(*results)
df = pd.DataFrame({'filename':list_a,'image_features':list_b})
df.to_pickle("PATH_TO_FILE"+str(count)+".pickle")
del list_a
del list_b
del df
gc.collect()
pool.close()
pool.join()
print("pool closed")
发布于 2019-05-22 04:52:14
现在,它可能是在第50000个非常大的东西,这导致了OOM,所以,为了测试这一点,我首先尝试:
file_list_chunks = list(divide_chunks(file_list_1,20000))[30000:]
如果在10,000次失败,这将确认20k是否太大,或者如果它再次在5万次失败时,代码会出现问题.
好的,在密码里.
首先,不需要显式的list
构造函数,在python中迭代要好得多,而不是将整个列表生成到内存中。
file_list_chunks = list(divide_chunks(file_list_1,20000))
# becomes
file_list_chunks = divide_chunks(file_list_1,20000)
我认为您可能在这里滥用ThreadPool:
防止任何其他任务被提交到池中。一旦完成了所有任务,工作进程就会退出。
这看上去好像close
可能还有一些思考仍然在运行,虽然我认为这是安全的,但我觉得有点非pythonic的感觉更好,最好为ThreadPool使用上下文管理器:
with ThreadPool(64) as pool:
results = pool.map(get_image_features,f)
# etc.
python 实际上并没有保证释放内存中的显式实际上并没有保证释放内存。
您应该在连接后收集/在with之后收集:
with ThreadPool(..):
...
pool.join()
gc.collect()
你也可以试着把它分成更小的块,比如10,000甚至更小!
锤子1
有一件事,我会考虑在这里做,而不是使用熊猫DataFrames和大列表是使用SQL数据库,您可以在本地使用sqlite3。
import sqlite3
conn = sqlite3.connect(':memory:', check_same_thread=False) # or, use a file e.g. 'image-features.db'
并使用上下文管理器:
with conn:
conn.execute('''CREATE TABLE images
(filename text, features text)''')
with conn:
# Insert a row of data
conn.execute("INSERT INTO images VALUES ('my-image.png','feature1,feature2')")
这样,我们就不必处理大列表对象或DataFrame了。
你可以把连接传递到每个线程..。你可能会有一些奇怪的事情,比如:
results = pool.map(get_image_features, zip(itertools.repeat(conn), f))
然后,在计算完成之后,您可以从数据库中选择所有您喜欢的格式。例如使用sql。
锤子2
这里使用一个子进程,而不是在python "shell“的同一个实例中将其运行到另一个实例中。
因为您可以将开始和结束传递到python作为sys.args,所以您可以将这些切片:
# main.py
# a for loop to iterate over this
subprocess.check_call(["python", "chunk.py", "0", "20000"])
# chunk.py a b
for count,f in enumerate(file_list_chunks):
if count < int(sys.argv[1]) or count > int(sys.argv[2]):
pass
# do stuff
这样,子进程将正确地清理python (不可能有内存泄漏,因为进程将被终止)。
我敢打赌,Hammer 1是要走的路,感觉就像在粘很多数据,不必要地将其读入python列表,而使用sqlite3 (或其他一些数据库)则完全避免了这种情况。
发布于 2019-05-18 08:03:15
备注:这不是一个答案,而是一个快速列出的问题和建议
ThreadPool()
from multiprocessing.pool
吗?这并不是很好的文档(在python3
中),我宁愿使用ThreadPoolExecutor (也参见这里)。sys.getsizeof()
返回所有声明的globals()
的列表,以及它们的内存占用。del results
(虽然我猜这不应该太大)发布于 2019-05-23 08:15:28
您的问题是,您所使用的线程应该使用多处理(CPU绑定与IO绑定)。
我会像这样重构你的代码:
from multiprocessing import Pool
if __name__ == '__main__':
cpus = multiprocessing.cpu_count()
with Pool(cpus-1) as p:
p.map(get_image_features, file_list_1)
然后,我将修改函数get_image_features
,将(类似于)这两行添加到它的末尾。我不知道您是如何处理这些图像的,但想法是在每个进程中处理每个图像,然后立即将其保存到磁盘中:
df = pd.DataFrame({'filename':list_a,'image_features':list_b})
df.to_pickle("PATH_TO_FILE"+str(count)+".pickle")
因此,dataframe将被腌制并保存在每个进程中,而不是在它退出之后。进程一退出就会从内存中清除出来,因此这应该可以使内存占用保持较低。
https://stackoverflow.com/questions/56126062
复制相似问题