我从单个python代码中运行多个进程:
代码片段:
while 1:
if sqsObject.msgCount() > 0:
ReadyMsg = sqsObject.readM2Q()
if ReadyMsg == 0:
continue
fileName = ReadyMsg['fileName']
dirName = ReadyMsg['dirName']
uuid = ReadyMsg['uid']
guid = ReadyMsg['guid']
callback = ReadyMsg['callbackurl']
# print ("Trigger Algorithm Process")
if(countProcess < maxProcess):
try:
retValue = Process(target=dosomething, args=(dirName, uuid,guid,callback))
processArray.append(retValue)
retValue.start()
countProcess = countProcess + 1
except:
print "Cannot Run Process"
else:
for i in range(len(processArray)):
if (processArray[i].is_alive() == True):
continue
else:
try:
#print 'Restart Process'
processArray[i] = Process(target=dosomething, args=(dirName,uuid,guid,callback))
processArray[i].start()
except:
print "Cannot Run Process"
else: # No more request to service
for i in range(len(processArray)):
if (processArray[i].is_alive() == True):
processRunning = 1
break
else:
continue
if processRunning == 0:
countProcess = 0
else:
processRunning = 0
在这里,我从队列中读取消息,并创建一个进程来运行该消息的算法。我要设置maxProcess的上限。因此,在到达maxProcess之后,我希望通过检查is_alive()来重用不活动的processArray插槽。
但是,对于数量较少的进程来说,这个过程运行良好,但是对于大量消息(例如100条消息)来说,内存消耗会达到顶点。我认为我通过重用进程插槽而有漏洞。
不知道在这个过程中出了什么问题。
预先感谢您发现错误或明智的建议。
发布于 2016-08-08 04:51:58
不知道在这个过程中出了什么问题。
似乎您创建的进程与消息一样多,即使在到达maxProcess计数时也是如此。
我认为我通过重用进程插槽而有漏洞。
没有必要自己管理流程。只需使用过程池
# before your while loop starts
from multiprocessing import Pool
pool = Pool(processes=max_process)
while 1:
...
# instead of creating a new Process
res = pool.apply_async(dosomething,
args=(dirName,uuid,guid,callback))
# after the while loop has finished
# -- wait to finish
pool.close()
pool.join()
提交作业的方法
请注意,班级支持几种提交作业的方法:
如果消息到达得足够快,最好一次收集多条消息(例如每次收集10条或100条,具体取决于实际处理),并使用map
一次向目标函数提交“小批处理”:
...
while True:
messages = []
# build mini-batch of messages
while len(messages) < batch_size:
... # get message
messages.append((dirName,uuid,guid,callback))
pool.map_async(dosomething, messages)
为了避免dosomething
留下的内存泄漏,您可以要求池在进程消耗了一些消息之后重新启动它:
max_tasks = 5 # some sensible number
Pool(max_processes, maxtasksperchild=max_tasks)
走向分布式
如果使用这种方法仍然超出内存容量,请考虑使用分布式方法,即添加更多的机器。使用芹菜将非常直接,来自于上面的内容:
# tasks.py
@task
def dosomething(...):
... # same code as before
# driver.py
while True:
... # get messages as before
res = somefunc.apply_async(args=(dirName,uuid,guid,callback))
https://stackoverflow.com/questions/38821247
复制相似问题