首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >使用并发未来Python3.5处理大文件的最快方法

使用并发未来Python3.5处理大文件的最快方法
EN

Stack Overflow用户
提问于 2017-03-22 10:22:28
回答 1查看 2.1K关注 0票数 2

我正在尝试使用并发期货来掌握多线程/多处理。

我已经尝试使用以下几组代码。我知道我总是会遇到磁盘IO问题,但我希望尽可能地最大化我的ram和CPU使用率。

对于大规模处理,最常用/最好的方法是什么?

如何使用并发期货来处理大型数据集?

是否有比下面的方法更受欢迎的方法?

方法1:

代码语言:javascript
复制
for folders in os.path.isdir(path):
    p = multiprocessing.Process(pool.apply_async(process_largeFiles(folders)))
    jobs.append(p)
    p.start()

方法二:

代码语言:javascript
复制
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
    for folders in os.path.isdir(path):
        executor.submit(process_largeFiles(folders), 100)

方法3:

代码语言:javascript
复制
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    for folders in os.path.isdir(path):
        executor.submit(process_largeFiles(folders), 10)

我应该尝试同时使用进程池和线程池吗?

方法(思想):

代码语言:javascript
复制
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as process:
     with concurrent.futures.ThreadPoolExecutor(max_workers=100) as thread:
          for folders in os.path.isdir(path):
              process.submit(thread.submit(process_largeFiles(folders), 100),10)

在最广泛的用例中最大化我的内存和cpu的最有效的方法是什么?

我意识到启动进程需要一点时间,但与正在处理的文件的大小相比,它会更重要吗?

EN

回答 1

Stack Overflow用户

发布于 2018-07-08 02:43:56

使用TreadPoolExecutor打开并读取文件,然后使用ProcessPoolExecutor处理数据。

代码语言:javascript
复制
import concurrent.futures
from collections import deque

TPExecutor = concurrent.futures.ThreadPoolExecutor
PPExecutor = concurrent.futures.ProcessPoolExecutor
def get_file(path):
    with open(path) as f:
        data = f.read()
    return data

def process_large_file(s):
    return sum(ord(c) for c in s)

files = [filename1, filename2, filename3, filename4, filename5,
         filename6, filename7, filename8, filename9, filename0]

results = []
completed_futures = collections.deque()

def callback(future, completed=completed_futures):
    completed.append(future)

with TPExecutor(max_workers = 4) as thread_pool_executor:
    data_futures = [thread_pool_executor.submit(get_file, path) for path in files]
with PPExecutor() as process_pool_executor:
    for data_future in concurrent.futures.as_completed(data_futures):
        future = process_pool_executor.submit(process_large_file, data_future.result())
        future.add_done_callback(callback)
        # collect any that have finished
        while completed_futures:
            results.append(completed_futures.pop().result())

使用了一个已完成的回调,这样它就不必等待完成的未来。我不知道这是如何影响效率的--它主要用来简化as_completed循环中的逻辑/代码。

如果由于内存限制而需要限制文件或数据提交,则需要对其进行重构。根据文件读取时间和处理时间,很难说在任何给定时刻内存中会有多少数据。我认为在as_completed中收集结果应该有助于缓解这种情况。在设置ProcessPoolExecutor时,data_futures可能会开始完成-该序列可能需要优化。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42941584

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档