首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何改变并行进程的数量?

如何改变并行进程的数量?
EN

Stack Overflow用户
提问于 2015-04-21 15:28:57
回答 2查看 122关注 0票数 6

我有一个python脚本,它并行运行一个方法。

代码语言:javascript
复制
parsers = {
    'parser1': parser1.process,
    'parser2': parser2.process
}

def process((key, value)):
    parsers[key](value)

pool = Pool(4)
pool.map(process_items, items)

process_items是我的方法,items是一个元组列表,每个元组包含两个元素。items列表有大约100 k项。

然后,process_items将根据给定的参数调用方法。我的问题是,我可以用高并行度运行列表的70%,但其他30%只能使用1/2线程运行,否则将导致超出我控制范围的失败。

因此,在我的代码中,我有大约10个不同的解析器进程。比如1-8,我想运行池(4),但9-10池(2)。

优化这个问题的最佳方法是什么?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2015-04-21 16:08:12

我认为你最好的选择是在这里使用两个池:

代码语言:javascript
复制
from multiprocessing import Pool
# import parsers here

parsers = {
    'parser1': parser1.process,
    'parser2': parser2.process,
    'parser3': parser3.process,
    'parser4': parser4.process,
    'parser5': parser5.process,
    'parser6': parser6.process,
    'parser7': parser7.process,
}

# Sets that define which items can use high parallelism,
# and which must use low
high_par = {"parser1", "parser3", "parser4", "parser6", "parser7"}
low_par = {"parser2", "parser5"}

def process_items(key, value):
    parsers[key](value)

def run_pool(func, items, num_items, check_set):
    pool = Pool(num_items)
    out = pool.map(func, (item for item in items if item[0] in check_set))
    pool.close()
    pool.join()
    return out

if __name__ == "__main__":
    items = [('parser2', x), ...] # Your list of tuples
    # Process with high parallelism
    high_results = run_pool(process_items, items, 4, high_par)
    # Process with low parallelism
    low_results = run_pool(process_items, items, 2, low_par)

通过巧妙地使用同步原语,在一个Pool中尝试这样做是可能的,但我不认为它最终会看起来比这个更干净。它的运行效率也可能会降低,因为有时您的池需要等待工作才能完成,因此它可以处理一个低并行项,即使队列中有高并行项可用。

如果需要将每个process_items调用的结果按原来可迭代性的相同顺序获得,这将变得有点复杂,这意味着来自每个Pool的结果需要合并,但根据您的示例,我认为这不是必需的。如果是的话,请告诉我,我会尝试相应地调整我的答案。

票数 2
EN

Stack Overflow用户

发布于 2015-04-21 15:44:07

可以在multiprocessing.Pool的构造函数中指定并行线程数。

代码语言:javascript
复制
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(5)   # 5 is the number of parallel threads
    print pool.map(f, [1, 2, 3])
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29776654

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档