首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用进程的Python多处理:占用大量内存

使用进程的Python多处理:占用大量内存
EN

Stack Overflow用户
提问于 2016-08-08 04:18:55
回答 2查看 1.2K关注 0票数 0

我从单个python代码中运行多个进程:

代码片段:

代码语言:javascript
运行
复制
while 1:
   if sqsObject.msgCount() > 0:
        ReadyMsg = sqsObject.readM2Q()
        if ReadyMsg == 0:
            continue
        fileName = ReadyMsg['fileName']
        dirName  = ReadyMsg['dirName']
        uuid         = ReadyMsg['uid']
        guid         = ReadyMsg['guid']
        callback     = ReadyMsg['callbackurl']

        # print ("Trigger Algorithm Process")
        if(countProcess < maxProcess):

           try:
             retValue = Process(target=dosomething, args=(dirName, uuid,guid,callback))
             processArray.append(retValue)
             retValue.start()
             countProcess = countProcess + 1
           except:
             print "Cannot Run Process"
        else:
           for i in range(len(processArray)):
              if (processArray[i].is_alive() == True):
                 continue
              else:
                 try:
                    #print 'Restart Process'
                    processArray[i] = Process(target=dosomething, args=(dirName,uuid,guid,callback))
                    processArray[i].start()
                 except:
                    print "Cannot Run Process"


   else: # No more request to service

       for i in range(len(processArray)):
            if (processArray[i].is_alive() == True):
                processRunning = 1
                break
            else:
                continue

      if processRunning == 0:
           countProcess = 0

      else:
           processRunning = 0

在这里,我从队列中读取消息,并创建一个进程来运行该消息的算法。我要设置maxProcess的上限。因此,在到达maxProcess之后,我希望通过检查is_alive()来重用不活动的processArray插槽。

但是,对于数量较少的进程来说,这个过程运行良好,但是对于大量消息(例如100条消息)来说,内存消耗会达到顶点。我认为我通过重用进程插槽而有漏洞。

不知道在这个过程中出了什么问题。

预先感谢您发现错误或明智的建议。

EN

Stack Overflow用户

发布于 2016-08-08 04:51:58

不知道在这个过程中出了什么问题。

似乎您创建的进程与消息一样多,即使在到达maxProcess计数时也是如此。

我认为我通过重用进程插槽而有漏洞。

没有必要自己管理流程。只需使用过程池

代码语言:javascript
运行
复制
 # before your while loop starts
 from multiprocessing import Pool
 pool = Pool(processes=max_process)
 while 1:
   ...
   # instead of creating a new Process
   res = pool.apply_async(dosomething, 
                          args=(dirName,uuid,guid,callback)) 
 # after the while loop has finished
 # -- wait to finish
 pool.close()
 pool.join()

提交作业的方法

请注意,班级支持几种提交作业的方法:

  • apply_async -一次只传递一条消息
  • map_async -一次一段消息

如果消息到达得足够快,最好一次收集多条消息(例如每次收集10条或100条,具体取决于实际处理),并使用map一次向目标函数提交“小批处理”:

代码语言:javascript
运行
复制
...
while True:
    messages = []
    # build mini-batch of messages
    while len(messages) < batch_size:
        ... # get message
        messages.append((dirName,uuid,guid,callback))
    pool.map_async(dosomething, messages)

为了避免dosomething留下的内存泄漏,您可以要求池在进程消耗了一些消息之后重新启动它:

代码语言:javascript
运行
复制
max_tasks = 5 # some sensible number
Pool(max_processes, maxtasksperchild=max_tasks)

走向分布式

如果使用这种方法仍然超出内存容量,请考虑使用分布式方法,即添加更多的机器。使用芹菜将非常直接,来自于上面的内容:

代码语言:javascript
运行
复制
# tasks.py
@task
def dosomething(...):
   ... # same code as before

# driver.py
  while True:
     ... # get messages as before
     res = somefunc.apply_async(args=(dirName,uuid,guid,callback))  
票数 0
EN
查看全部 2 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38821247

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档