我使用一个简单的RabbitMQ队列将任务分发给工作进程。每个工作进程同时使用一个multiprocessing
实例池处理多个任务,以尽可能多地使用内存和cpu。
问题是,有些任务比其他任务占用更多的RAM,因此如果启动多个实例,工作进程就会崩溃。但是,当工作人员正在处理RAM密集型任务时,我希望它能够处理其他较小的RAM密集型任务,以使用其他CPU。
一个想法是使用多个队列或主题,但我想知道推荐的方法是什么。在内存错误使进程崩溃之前,我能捕捉到它们吗?
解决这一问题的正确方法是什么?
更新
整个系统将由多个多核机器组成,但在每台多核机器上只有一个工作程序在运行,这就产生了与核一样多的多处理实例。不同的机器应该相互独立,除非它们从同一个队列中获取它们的任务。
发布于 2014-07-30 16:23:22
我认为试图从OOM错误中捕获和恢复是非常困难的,如果不是不可能的话。您需要运行一个线程或进程来不断监视内存使用情况,而当它检测到内存使用情况太高时,does...what是否准确?杀死处理任务的进程?尝试暂停它(如果可能的话;它可能不取决于您的任务正在做什么)。即使如此,暂停也不会释放任何内存。您必须释放内存,并在其安全时重新启动任务,这意味着您必须请求它,决定它何时安全,等等。
与其尝试检测和从问题中恢复,我建议完全避免它。创建两个队列和两个池。一个队列/池用于高内存任务,另一个队列/池用于低内存任务.高内存池中只有一个进程,因此它仅限于同时运行一个任务,从而节省了您的内存。低内存队列将有multiprocessing.cpu_count() - 1
进程,允许您在两个池中保持CPU饱和。
这种方法的一个潜在问题是,如果在低内存任务尚未完成的情况下耗尽了高内存队列,那么您将浪费一个CPU。您可以以非阻塞的方式(或通过超时)处理高内存队列中的消费,这样,当您准备使用任务时,如果高内存队列为空,则可以转而获取一个低内存任务。然后,当您完成处理时,再次检查高内存队列。
就像这样:
import multiprocessing
# hi_q and lo_q are placeholders for whatever library you're using to consume from RabbitMQ
def high_mem_consume():
while True:
task = hi_q.consume(timeout=2)
if not task:
lo_q.consume(timeout=2)
if task:
process_task(task)
def low_mem_consume():
while True:
task = lo_q.consume() # Blocks forever
process_task(task)
if __name__ == "__main__":
hi_pool = multiprocessing.Pool(1)
lo_pool = multiprocessing.Pool(multiprocessing.cpu_count() - 1)
hi_pool.apply_async(high_mem_consume)
lo_pool.apply_async(lo_mem_consume)
https://stackoverflow.com/questions/25000738
复制相似问题