首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Python process_map:追加进程之间共享的列表?

Python process_map:追加进程之间共享的列表?
EN

Stack Overflow用户
提问于 2021-06-13 10:34:43
回答 1查看 4.1K关注 0票数 1

我希望共享一个列表来附加来自并行线程的输出,这是由process_maptqdm启动的。(我想使用process_map的原因是很好的进度指示器和max_workers=选项。)

我尝试过使用from multiprocessing import Manager创建共享列表,但是这里我做了一些错误的事情:我的代码打印一个空的shared_list,但是它应该打印一个20个数字的列表,正确的顺序并不重要。

任何帮助都将不胜感激,谢谢您的提前!

代码语言:javascript
运行
复制
import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager


shared_list = []

def worker(i):
    global shared_list
    time.sleep(1)
    shared_list.append(i)

if __name__ == '__main__':
    manager = Manager()
    shared_list = manager.list()

    process_map(worker, range(20), max_workers=5)
    print(shared_list)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-06-13 12:27:38

您没有指定在哪个平台上运行(您应该在使用multiprocessing标记问题时使用平台标记您的问题),但是您似乎是在一个使用spawn创建新进程(如Windows)的平台下运行的。这意味着,当启动新进程时,将创建一个空地址空间,启动一个新的Python解释器,并从顶部重新执行源。

因此,虽然在开始为if __name__ == '__main__':分配给shared_list的托管列表的块中,创建的池中的每个进程都将执行shared_list = [],从而破坏您的初始赋值。

您可以将shared_list作为第一个参数传递给worker函数:

代码语言:javascript
运行
复制
import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager
from functools import partial

def worker(shared_list, i):
    time.sleep(1)
    shared_list.append(i)

if __name__ == '__main__':
    manager = Manager()
    shared_list = manager.list()

    process_map(partial(worker, shared_list), range(20), max_workers=5)
    print(shared_list)

如果process_map以与ProcessPoolExecutor类相同的方式支持初始化器和initargs参数(似乎不支持),那么可以这样做:

代码语言:javascript
运行
复制
import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager

def init_pool(the_list):
    global shared_list
    shared_list = the_list

def worker(i):
    time.sleep(1)
    shared_list.append(i)

if __name__ == '__main__':
    manager = Manager()
    shared_list = manager.list()

    process_map(worker, range(20), max_workers=5, initializer=init_pool, initargs=(shared_list,))
    print(shared_list)

注释

这与原始问题本身没有任何关系,但是对于这种类型的问题,您可能想要考虑使用一个托管列表来代替您的工作函数(巧合地命名为worker)附加元素的托管列表,并且添加元素的顺序是不确定的,因为您对池进程的调度没有任何控制,因此初始化的multiprocessing.Array实例如下:

代码语言:javascript
运行
复制
shared_list = Array('i', [0] * 20, lock=False)

然后你的工人功能变成:

代码语言:javascript
运行
复制
def worker(i):
    time.sleep(1)
    shared_list[i] = i

在这里,数组存储在共享内存中,甚至不需要锁定访问,因为每次调用worker都访问数组的不同索引。访问共享内存阵列的元素比访问托管列表的元素要快得多。唯一的问题是对共享内存的引用不能作为参数传递,而且我们看到process_map不支持初始化器和initargs参数。所以你得用低层次的方法。例如:

代码语言:javascript
运行
复制
import time
from multiprocessing import Pool, Array
from tqdm import tqdm

def init_pool(the_list):
    global shared_list
    shared_list = the_list

def worker(i):
    time.sleep(1)
    shared_list[i] = i

if __name__ == '__main__':
    # Preallocate 20 slots for the array in shared memory
    # And we don't require a lock if each worker invocation is accessing a different Array index:
    args = range(20)
    shared_list = Array('i', [0] * len(args), lock=False)

    with tqdm(total=len(args)) as pbar:
        pool = Pool(5, initializer=init_pool, initargs=(shared_list,))
        for result in pool.imap_unordered(worker, args):
            pbar.update(1)
    # print out elements one at a time:
    for elem in shared_list:
        print(elem)
    # print out all elements at once (must first convert to a regular list):
    print(list(shared_list))

评论2

我会避免使用process_map。此函数基于ProcessPoolExecutor.map方法的ProcessPoolExecutor.map方法,该方法需要返回与可迭代传递的元素对应的顺序,而不是按完成顺序返回结果。想象一下,如果由于某种原因,处理第一个提交的任务的流程,在我们的例子中,i值为0,需要很长的时间来处理,并最终被证明是完成的最后一个任务。在第一次提交的任务完成之前,您将看到tqdm进度条很长一段时间内什么也不做。但是,当发生这种情况时,我们知道所有其他提交的任务已经完成,因此进度栏将从0立即跳转到100%。修改函数worker,如下所示:

代码语言:javascript
运行
复制
def worker(shared_list, i):
    if i == 0:
        time.sleep(5)
    else:
        time.sleep(.25)
    shared_list.append(i)

我上面提供的使用Pool.imap_unordered的代码版本允许按顺序返回结果,默认的块状值为1,它将按完成顺序排列。进度条将更顺利地进行。

评论3

tqdm中似乎也有一个bug。下面的程序演示了如何使用低级别的tqdm调用,这次是concurrent.futures模块。不幸的是,它的ProcessPoolExecutor类(用于多处理)和ThreadPoolExecutor类(用于多线程)没有与imap_unordered方法等效的方法。您必须使用submit方法(其multiprocessing.pool.Pool模拟为apply_async方法),该方法返回一个Future实例,您可以在该实例上调用result方法来阻止完成,并返回提交的任务的结果。您将submit一组任务并将返回的Future实例存储在一个列表中,然后使用as_completed函数调用从该列表返回已完成的下一个已完成的Future实例。

此演示使用线程并创建一个大小为20并提交20个任务的线程池,因此所有任务都应同时启动。worker1的睡眠时间被设置为不同,因此i参数的值越小,睡眠时间就越长。此程序创建池并提交任务4次。第一次,返回值只是打印出来。第二次使用tqdm进度条。结果如你所料。第三次worker2tqdm进度条一起使用。不同的是,对于i != 0的所有值,睡眠时间都是常数(.25秒),因此对于i值1、2、. 19,任务应该在大致相同的时间完成。因此,您希望在很短的时间内看到进度条跳转到95%,然后等待i == 0任务完成。然而,你所观察到的却恰恰相反。进度条到5%,挂在那里很长时间,然后跳转到100%。第四种情况是在我自己的“进度条”中使用worker2,它的行为与您预期的一样。

这是Python3.8.5下的tqdm 4.61.1。我已经在Windows和Linux下对此进行了测试。有人对这种行为有什么解释吗?

代码语言:javascript
运行
复制
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import sys

class MyProgressBar:
    def __init__(self, n_tasks):
        self._task_count = n_tasks
        self._completed = 0
        self.update()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        print(file=sys.stderr)
        return False

    def update(self, count=0):
        self._completed += count
        print(f'\r{self._completed} of {self._task_count} task(s) completed.', end='', flush=True)

def worker1(i):
    if i == 0:
        time.sleep(8)
    else:
        time.sleep(5 - i/5)
    return i

def worker2(i):
    if i == 0:
        time.sleep(8)
    else:
        time.sleep(.25)
    return i

if __name__ == '__main__':
    args = range(20)

    with ThreadPoolExecutor(max_workers=20) as pool:
        futures = [pool.submit(worker1, arg) for arg in args]
        for future in as_completed(futures):
            print(future.result())

    with ThreadPoolExecutor(max_workers=20) as pool:
        with tqdm(total=len(args)) as pbar:
            futures = [pool.submit(worker1, arg) for arg in args]
            for future in as_completed(futures):
                future.result()
                pbar.update(1)

    with ThreadPoolExecutor(max_workers=20) as pool:
        with tqdm(total=len(args)) as pbar:
            futures = [pool.submit(worker2, arg) for arg in args]
            for future in as_completed(futures):
                future.result()
                pbar.update(1)

    # Works with my progress "bar":
    with ThreadPoolExecutor(max_workers=20) as pool:
        with MyProgressBar(len(args)) as pbar:
            futures = [pool.submit(worker2, arg) for arg in args]
            for future in as_completed(futures):
                future.result()
                pbar.update(1)
票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67957266

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档