因此,我试图找出一个通用的解决方案,它将收集函数中的所有值,并将它们附加到稍后可访问的列表中。这将在concurrent.futures或threading类型任务期间使用。下面是我使用全局master_list的解决方案
from concurrent.futures import ThreadPoolExecutor
master_list = []
def return_from_multithreaded(func):
    # master_list = []
    def wrapper(*args, **kwargs):
        # nonlocal master_list
        global master_list
        master_list += func(*args, **kwargs)
    return wrapper
@return_from_multithreaded
def f(n):
    return [n]
with ThreadPoolExecutor(max_workers=20) as exec:
    exec.map(f, range(1, 100))
print(master_list)我想找一个不包括全局的解决方案,也许可以返回作为闭包存储的注释掉的master_list?
发布于 2016-11-18 22:38:59
如果不想使用globals,请不要丢弃map的结果。map返回每个函数返回的值,您只是忽略了它们。通过将map用于其预期目的,这段代码可以变得简单得多:
def f(n):
    return n  # No need to wrap in list
with ThreadPoolExecutor(max_workers=20) as exec:
    master_list = list(exec.map(f, range(1, 100)))
print(master_list)如果您需要一个显示到目前为止计算结果的master_list (可能是其他线程正在监视它),那么只需将循环显式化:
def f(n):
    return n  # No need to wrap in list
master_list = []
with ThreadPoolExecutor(max_workers=20) as exec:
    for result in exec.map(f, range(1, 100)):
        master_list.append(result)
print(master_list)这就是Executor模型所设计的;普通线程并不打算返回值,但是Executors提供了一个在封面下返回值的通道,因此您不必自己管理它。在内部,这是使用某种形式的队列,使用额外的元数据来保持结果的有序,但是您不需要处理这种复杂性;从您的角度来看,它等同于常规的map函数,它恰好是并行化工作。
更新以涵盖处理异常:
当结果被击中时,map将在工人中引发任何异常。因此,正如所写的,如果任何任务失败,第一组代码将不会存储任何内容( list将被部分构造,但当异常发生时将被丢弃)。第二个示例只会在抛出第一个异常之前保存结果,其余的将被丢弃(您必须存储map迭代器并使用一些尴尬的代码来避免它)。如果您需要存储所有成功的结果,忽略失败(或者只是以某种方式记录它们),那么最简单的方法是使用submit创建一个list of Future对象,然后依次或按完成顺序等待它们,将.result()调用包装在try/except中,以避免丢弃好的结果。例如,要按提交顺序存储结果,您可以这样做:
master_list = []
with ThreadPoolExecutor(max_workers=20) as exec:
    futures = [exec.submit(f, i) for i in range(1, 100)]
    exec.shutdown(False)  # Optional: workers terminate as soon as all futures finish,
                          # rather than waiting for all results to be processed
    for fut in futures:
        try:
            master_list.append(fut.result())
        except Exception:
            ... log error here ...为了获得更高效的代码,您可以按照完成的顺序检索结果,而不是提交结果,使用concurrent.futures.as_completed在结果完成时急切地检索结果。与以前的代码相比,唯一的变化是:
    for fut in futures:变成:
    for fut in concurrent.futures.as_completed(futures):as_completed在完成/取消期货后立即完成/取消期货的工作,而不是推迟到所有较早提交的期货完成并得到处理。
还有更复杂的选项涉及到使用add_done_callback,因此主线程根本不涉及显式处理结果,但这通常是不必要的,而且常常令人困惑,所以最好尽可能避免。
发布于 2016-11-18 22:24:15
我过去曾面临过这样的问题:https://stackoverflow.com/questions/40536287/running-multiple-asynchronous-function-and-get-the-returned-value-of-each-functi。我就是这么做的:
def async_call(func_list):
    """
    Runs the list of function asynchronously.
    :param func_list: Expects list of lists to be of format
        [[func1, args1, kwargs1], [func2, args2, kwargs2], ...]
    :return: List of output of the functions
        [output1, output2, ...]
    """
    def worker(function, f_args, f_kwargs, queue, index):
        """
        Runs the function and appends the output to list, and the Exception in the case of error
        """
        response = {
            'index': index,  # For tracking the index of each function in actual list.
                             # Since, this function is called asynchronously, order in
                             # queue may differ
            'data': None,
            'error': None
        }
        # Handle error in the function call
        try:
            response['data'] = function(*f_args, **f_kwargs)
        except Exception as e:
            response['error'] = e  # send back the exception along with the queue
        queue.put(response)
    queue = Queue()
    processes = [Process(target=worker, args=(func, args, kwargs, queue, i)) \
                    for i, (func, args, kwargs) in enumerate(func_list)]
    for process in processes:
        process.start()
    response_list = []
    for process in processes:
        # Wait for process to finish
        process.join()
        # Get back the response from the queue
        response = queue.get()
        if response['error']:
            raise response['error']   # Raise exception if the function call failed
        response_list.append(response)
    return [content['data'] for content in sorted(response_list, key=lambda x: x['index'])]样本运行:
def my_sum(x, y):
    return x + y
def your_mul(x, y):
    return x*y
my_func_list = [[my_sum, [1], {'y': 2}], [your_mul, [], {'x':1, 'y':2}]]
async_call(my_func_list)
# Value returned: [3, 2]https://stackoverflow.com/questions/40686821
复制相似问题