首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Python多处理返回以块大小为单位设置的结果

Python多处理返回以块大小为单位设置的结果
EN

Stack Overflow用户
提问于 2020-10-22 20:05:05
回答 1查看 262关注 0票数 1

我想用一个名为get_scores_dataframe的函数来处理存储在file_list中的大量csv文件。此函数接受存储在另一个列表中的第二个参数phenotypes。然后,该函数将结果写回csv文件。我设法使用ProcessPoolExecutor()并行化了这项任务,因此,它可以工作。

代码语言:javascript
运行
复制
   with concurrent.futures.ProcessPoolExecutor() as executor:
        phenotypes = [phenotype for i in range(len(file_list))]
        futures = executor.map(get_scores_dataframe, file_list, phenotypes,
                                    chunksize=25)
        filenames = executor.map(os.path.basename, file_list)
        for future, filename  in zip(futures, filenames):
                futures.to_csv(os.path.join(f'{output_path}',f'{filename}.csv'),
                              index = False)

正如您所看到的,我为此使用了上下文管理器,并在上下文管理器中使用了可以设置选项chunksizemap()方法。但是,我希望程序在处理完每个数据帧时写入csv文件。上下文管理器似乎一直等到所有作业完成,然后将结果写入csv文件。

你知道我怎样才能做到这一点吗?

EN

Stack Overflow用户

回答已采纳

发布于 2020-10-22 20:58:11

首先,executor.map不返回Future实例,因此您的变量futures命名不当。它返回一个迭代器,该迭代器依次产生对file_list的每个元素应用get_scores_dataframe的返回值。其次,看看下面是如何使用它的,看起来这些返回值是输入文件(它可能与输入参数是同一个文件,也可能不是同一个文件--由于缺少显示的代码,所以不能确定)。此外,使用进程池map函数而不是内置的map函数来获取文件名参数的基本名称似乎有点过分。最后,在您的代码中,它不是futures.to_csv,而是future.to_csv。所以我对你的代码是如何工作的感到困惑。

如果您修改函数get_scores_dataframe以返回一个由dataframe和最初传递的文件名参数组成的元组,那么我们可以使用as_competed按完成顺序处理结果

代码语言:javascript
运行
复制
from concurrent.futures import as_completed
import multiprocessing

with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count() - 1) as executor:
    futures = [executor.submit(get_scores_dataframe, file, phenotype) for file in file_list]
    for future in as_completed(futures):
        # it is assumed return value is tuple: (data frame, original filename argument):
        df, file = future.result()
        csv_filename = os.path.basename(file)
        df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)

现在,通过使用submit,您正在失去将作业提交“分块”的能力。我们可以切换到在imap_unordered中使用multiprocessing.Pool。但是imap_unordered只能将单个参数传递给worker函数。因此,如果您能够修改您的worker以更改参数的顺序,我们可以将phenotype作为第一个参数并使用partial (see manual)

代码语言:javascript
运行
复制
import multiprocessing
from functools import partial


POOL_SIZE = multiprocessing.cpu_count() - 1 # leave 1 for main process


def compute_chunksize(iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, POOL_SIZE * 4)
    if extra:
        chunksize += 1
    return chunksize


with multiprocessing.Pool(POOL_SIZE) as pool:
    chunksize = compute_chunksize(len(file_list))
    worker = partial(get_scores_dataframe, phenotype)
    # it is assumed that start_processing returns a tuple: (data frame, original filename argument)
    for df, file in pool.imap_unordered(worker, file_list, chunksize):
        csv_filename = os.path.basename(file)
        df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)
票数 2
EN
查看全部 1 条回答
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64481843

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档