首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Python中的多进程,同时限制正在运行的进程的数量

Python中的多进程,同时限制正在运行的进程的数量
EN

Stack Overflow用户
提问于 2012-08-17 06:59:54
回答 3查看 15.9K关注 0票数 19

我希望同时运行多个program.py实例,同时限制同时运行的实例数量(例如,限制系统上可用的CPU核心数量)。例如,如果我有10个内核,总共需要运行1000次program.py,那么在任何给定时间都只会创建并运行10个实例。

我尝试过使用多处理模块、多线程和队列,但对我来说,似乎没有任何一种方法适合于简单的实现。我遇到的最大问题是找到一种方法来限制同时运行的进程的数量。这一点很重要,因为如果我一次创建1000个进程,它就相当于一个分叉炸弹。我不需要从进程以编程方式返回结果(它们输出到磁盘),并且这些进程都是彼此独立运行的。

有没有人能给我提个建议或者举个例子,告诉我如何用python或者bash来实现这个功能?我会使用队列发布我到目前为止编写的代码,但它不能按预期工作,而且可能已经走错了路。

非常感谢。

EN

回答 3

Stack Overflow用户

发布于 2012-08-17 08:18:01

您应该使用流程主管。一种方法是使用Circus提供的应用程序接口来“编程”,文档站点现在离线了,但我认为这只是一个暂时的问题,无论如何,你可以使用马戏团来处理这个问题。另一种方法是使用supervisord并将进程的参数numprocs设置为您拥有的内核数量。

使用Circus的示例:

代码语言:javascript
复制
from circus import get_arbiter

arbiter = get_arbiter("myprogram", numprocesses=3)
try:
    arbiter.start()
finally:
    arbiter.stop()
票数 3
EN

Stack Overflow用户

发布于 2012-08-17 08:04:56

Bash脚本而不是Python,但我经常使用它来进行简单的并行处理:

代码语言:javascript
复制
#!/usr/bin/env bash
waitForNProcs()
{
 nprocs=$(pgrep -f $procName | wc -l)
 while [ $nprocs -gt $MAXPROCS ]; do
  sleep $SLEEPTIME
  nprocs=$(pgrep -f $procName | wc -l)
 done
}
SLEEPTIME=3
MAXPROCS=10
procName=myPython.py
for file in ./data/*.txt; do
 waitForNProcs
 ./$procName $file &
done

或者对于非常简单的情况,另一个选项是xargs,其中P设置procs的数量

代码语言:javascript
复制
find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 
票数 2
EN

Stack Overflow用户

发布于 2017-12-29 21:16:51

虽然有很多关于使用multiprocessing.pool的答案,但关于如何使用multiprocessing.Process的代码片段并不多,当内存使用很重要时,这确实更有好处。启动1000个进程将使CPU过载并杀死内存。如果每个进程及其数据管道都是内存密集型的,则操作系统或Python本身将限制并行进程的数量。我开发了下面的代码来限制同时批量提交给CPU的作业数量。批处理大小可以与CPU核心的数量成比例地缩放。在我的windows电脑上,每批作业的效率最高可达CPU的4倍。

代码语言:javascript
复制
import multiprocessing
def func_to_be_multiprocessed(q,data):
    q.put(('s'))
q = multiprocessing.Queue()
worker = []
for p in range(number_of_jobs):
    worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \
        args=(q,data)...))
num_cores = multiprocessing.cpu_count()
Scaling_factor_batch_jobs = 3.0
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs
num_of_batches = number_of_jobs // num_jobs_per_batch
for i_batch in range(num_of_batches):
    floor_job = i_batch * num_jobs_per_batch
    ceil_job  = floor_job + num_jobs_per_batch
    for p in worker[floor_job : ceil_job]:
                                         worker.start()
    for p in worker[floor_job : ceil_job]:
                                         worker.join()
for p in worker[ceil_job :]:
                           worker.start()
for p in worker[ceil_job :]:
                           worker.join()
for p in multiprocessing.active_children():
                           p.terminate()
result = []
for p in worker:
   result.append(q.get())

唯一的问题是,如果任何批处理中的任何作业无法完成并导致挂起情况,则批处理的其余作业将不会启动。因此,要处理的函数必须具有适当的错误处理例程。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/11996632

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档