前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多进程并行计算

多进程并行计算

原创
作者头像
用户11021319
发布2024-04-02 10:11:04
1170
发布2024-04-02 10:11:04

问题背景

我有了一个 Python 脚本,我想用它作为另一个 Python 脚本的控制器。我的服务器有 64 个处理器,所以我想要同时启动最多 64 个此第二个 Python 脚本的子进程。子脚本称为:

$ python create_graphs.py --name=NAME

其中 NAME 类似于 XYZ、ABC、NYU 等。 在我的父控制器脚本中,我从列表中检索名称变量:

my_list = [ ‘XYZ’, ‘ABC’, ‘NYU’ ]

我的问题是,以子进程身份启动这些进程的最佳方法是什么?我希望将子进程的数量限制在每次 64 个,因此需要跟踪状态(子进程是否已完成),以便能够有效地保持整个生成过程的运行。 我研究过使用 subprocess 包,但拒绝了它,因为它一次只能生成一个子进程。我最终找到了 multiprocessor 包,但我不得不承认被整个线程与子进程文档搞得不知所措。 目前,我的脚本使用 subprocess.call 一次只生成一个子进程,如下所示:

#!/path/to/python import subprocess, multiprocessing, Queue from multiprocessing import Process

my_list = [ ‘XYZ’, ‘ABC’, ‘NYU’ ]

if name == ‘main’: processors = multiprocessing.cpu_count()

代码语言:javascript
复制
for i in range(len(my_list)):
    if( i < processors ):
         cmd = ["python", "/path/to/create_graphs.py", "--name="+ my_list[i]]
         child = subprocess.call( cmd, shell=False )

我真的想一次生成 64 个子进程。在其他 stackoverflow 问题中,我看到人们使用 Queue,但它似乎会产生性能影响?

解决方案

您可以使用 multiprocessing 中的进程池类来实现多进程并行计算。

代码语言:javascript
复制
import multiprocessing
import subprocess

def work(cmd):
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':
    count = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=count)
    print pool.map(work, ['ls'] * count)

这是一个使用进程池和 subprocess.call 的简单示例。

或者,您可以使用多线程来实现并行计算,在这里推荐使用 threading.Thread 类来创建线程,并使用 join() 方法来同步它们。

代码语言:python
复制
import threading

def work(name):
    print 'Processing station:', name
    print 'Parent process:', os.getppid()
    print 'Process id:', os.getpid()
    cmd = [ "/bin/bash" "/path/to/executable/create_graphs.sh","--name=%s" % (name) ]
    return subprocess.call(cmd, shell=False)

if __name__ == '__main__':

    my_list = [ 'XYZ', 'ABC', 'NYU' ]

    my_list.sort()

    print my_list

    # Get the number of processors available
    num_processes = multiprocessing.cpu_count()

    threads = []

    len_stas = len(my_list)

    print "+++ Number of stations to process: %s" % (len_stas)

    # run until all the threads are done, and there is no data left

    for list_item in my_list:

        # if we aren't using all the processors AND there is still data left to
        # compute, then spawn another thread

        if( len(threads) < num_processes ):

            p = multiprocessing.Process(target=work,args=[list_item])

            p.start()

            print p, p.is_alive()

            threads.append(p)

        else:

            for thread in threads:

                if not thread.is_alive():

                    threads.remove(thread)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题背景
  • 解决方案
相关产品与服务
GPU 云服务器
GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档