首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在不使用全局的情况下多线程时收集函数返回值?

如何在不使用全局的情况下多线程时收集函数返回值?
EN

Stack Overflow用户
提问于 2016-11-18 22:18:58
回答 2查看 4.7K关注 0票数 1

因此,我试图找出一个通用的解决方案,它将收集函数中的所有值,并将它们附加到稍后可访问的列表中。这将在concurrent.futuresthreading类型任务期间使用。下面是我使用全局master_list的解决方案

代码语言:javascript
运行
复制
from concurrent.futures import ThreadPoolExecutor

master_list = []
def return_from_multithreaded(func):
    # master_list = []
    def wrapper(*args, **kwargs):
        # nonlocal master_list
        global master_list
        master_list += func(*args, **kwargs)
    return wrapper


@return_from_multithreaded
def f(n):
    return [n]


with ThreadPoolExecutor(max_workers=20) as exec:
    exec.map(f, range(1, 100))

print(master_list)

我想找一个不包括全局的解决方案,也许可以返回作为闭包存储的注释掉的master_list

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-11-18 22:38:59

如果不想使用globals,请不要丢弃map的结果。map返回每个函数返回的值,您只是忽略了它们。通过将map用于其预期目的,这段代码可以变得简单得多:

代码语言:javascript
运行
复制
def f(n):
    return n  # No need to wrap in list

with ThreadPoolExecutor(max_workers=20) as exec:
    master_list = list(exec.map(f, range(1, 100)))

print(master_list)

如果您需要一个显示到目前为止计算结果的master_list (可能是其他线程正在监视它),那么只需将循环显式化:

代码语言:javascript
运行
复制
def f(n):
    return n  # No need to wrap in list

master_list = []
with ThreadPoolExecutor(max_workers=20) as exec:
    for result in exec.map(f, range(1, 100)):
        master_list.append(result)

print(master_list)

这就是Executor模型所设计的;普通线程并不打算返回值,但是Executors提供了一个在封面下返回值的通道,因此您不必自己管理它。在内部,这是使用某种形式的队列,使用额外的元数据来保持结果的有序,但是您不需要处理这种复杂性;从您的角度来看,它等同于常规的map函数,它恰好是并行化工作。

更新以涵盖处理异常:

当结果被击中时,map将在工人中引发任何异常。因此,正如所写的,如果任何任务失败,第一组代码将不会存储任何内容( list将被部分构造,但当异常发生时将被丢弃)。第二个示例只会在抛出第一个异常之前保存结果,其余的将被丢弃(您必须存储map迭代器并使用一些尴尬的代码来避免它)。如果您需要存储所有成功的结果,忽略失败(或者只是以某种方式记录它们),那么最简单的方法是使用submit创建一个list of Future对象,然后依次或按完成顺序等待它们,将.result()调用包装在try/except中,以避免丢弃好的结果。例如,要按提交顺序存储结果,您可以这样做:

代码语言:javascript
运行
复制
master_list = []
with ThreadPoolExecutor(max_workers=20) as exec:
    futures = [exec.submit(f, i) for i in range(1, 100)]
    exec.shutdown(False)  # Optional: workers terminate as soon as all futures finish,
                          # rather than waiting for all results to be processed
    for fut in futures:
        try:
            master_list.append(fut.result())
        except Exception:
            ... log error here ...

为了获得更高效的代码,您可以按照完成的顺序检索结果,而不是提交结果,使用concurrent.futures.as_completed在结果完成时急切地检索结果。与以前的代码相比,唯一的变化是:

代码语言:javascript
运行
复制
    for fut in futures:

变成:

代码语言:javascript
运行
复制
    for fut in concurrent.futures.as_completed(futures):

as_completed在完成/取消期货后立即完成/取消期货的工作,而不是推迟到所有较早提交的期货完成并得到处理。

还有更复杂的选项涉及到使用add_done_callback,因此主线程根本不涉及显式处理结果,但这通常是不必要的,而且常常令人困惑,所以最好尽可能避免。

票数 7
EN

Stack Overflow用户

发布于 2016-11-18 22:24:15

我过去曾面临过这样的问题:https://stackoverflow.com/questions/40536287/running-multiple-asynchronous-function-and-get-the-returned-value-of-each-functi。我就是这么做的:

代码语言:javascript
运行
复制
def async_call(func_list):
    """
    Runs the list of function asynchronously.

    :param func_list: Expects list of lists to be of format
        [[func1, args1, kwargs1], [func2, args2, kwargs2], ...]
    :return: List of output of the functions
        [output1, output2, ...]
    """
    def worker(function, f_args, f_kwargs, queue, index):
        """
        Runs the function and appends the output to list, and the Exception in the case of error
        """
        response = {
            'index': index,  # For tracking the index of each function in actual list.
                             # Since, this function is called asynchronously, order in
                             # queue may differ
            'data': None,
            'error': None
        }

        # Handle error in the function call
        try:
            response['data'] = function(*f_args, **f_kwargs)
        except Exception as e:
            response['error'] = e  # send back the exception along with the queue

        queue.put(response)
    queue = Queue()
    processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \
                    for i, (func, args, kwargs) in enumerate(func_list)]

    for process in processes:
        process.start()

    response_list = []
    for process in processes:
        # Wait for process to finish
        process.join()

        # Get back the response from the queue
        response = queue.get()
        if response['error']:
            raise response['error']   # Raise exception if the function call failed
        response_list.append(response)

    return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])]

样本运行:

代码语言:javascript
运行
复制
def my_sum(x, y):
    return x + y

def your_mul(x, y):
    return x*y

my_func_list = [[my_sum, [1], {'y': 2}], [your_mul, [], {'x':1, 'y':2}]]

async_call(my_func_list)
# Value returned: [3, 2]
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40686821

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档