首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何将multiprocessing.Pool实例传递给apply_async回调函数?

如何将multiprocessing.Pool实例传递给apply_async回调函数?
EN

Stack Overflow用户
提问于 2017-08-30 09:57:26
回答 3查看 5.5K关注 0票数 19

这是我的主要分解程序,我在pool.apply_async(findK, args=(N,begin,end))中添加了一个回调函数,当分解结束时,prime factorization is over会提示出一条消息,它工作得很好。

import math
import multiprocessing 

def findK(N,begin,end):
    for k in range(begin,end):
        if N% k == 0:
            print(N,"=" ,k ,"*", N/k)
            return True
    return False


def prompt(result):
    if result:
        print("prime factorization is over")


def mainFun(N,process_num):
    pool = multiprocessing.Pool(process_num)
    for i in range(process_num):
        if i ==0 :
            begin =2
        else:
            begin = int(math.sqrt(N)/process_num*i)+1
        end = int(math.sqrt(N)/process_num*(i+1))
        pool.apply_async(findK, args=(N,begin,end) , callback = prompt)    
    pool.close()
    pool.join()    

if __name__ == "__main__":
    N = 684568031001583853
    process_num = 16
    mainFun(N,process_num)

现在我想更改apply_async中的回调函数,将prompt更改为shutdown函数,以杀死所有其他进程。

def prompt(result):
    if result:
        pool.terminate()

未在prompt作用域中定义池实例或未将其传入prompt中。

pool.terminate()不能在提示函数中工作。

如何将multiprocessing.Pool实例传递给apply_async的回调函数?

(我已经用类的格式做了,只要添加一个类方法,调用self.pool.terminate就可以杀死所有其他进程,怎么做函数格式的工作呢?)

如果没有将pool设置为全局变量,那么pool是否可以传入回调函数?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2017-09-01 15:52:00

不支持向回调函数传递额外的参数。然而,您有很多优雅的方法来解决这个问题。

您可以将池逻辑封装到一个对象中:

class Executor:
    def __init__(self, process_num):
        self.pool = multiprocessing.Pool(process_num)

    def prompt(self, result):
        if result:
            print("prime factorization is over")
            self.pool.terminate()

    def schedule(self, function, args):
        self.pool.apply_async(function, args=args, callback=self.prompt)

    def wait(self):
        self.pool.close()
        self.pool.join() 


def main(N,process_num):
    executor = Executor(process_num)
    for i in range(process_num):
        ...
        executor.schedule(findK, (N,begin,end))   
    executor.wait()

或者,您可以使用返回Future对象的concurrent.futures.Executor实现。在设置回调之前,只需将池附加到Future对象即可。

def prompt(future):
    if future.result():
        print("prime factorization is over")
        future.pool_executor.shutdown(wait=False)

def main(N,process_num):
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=process_num)
    for i in range(process_num):
        ...
        future = executor.submit(findK, N,begin,end)
        future.pool_executor = executor
        future.add_done_callback(prompt)
票数 12
EN

Stack Overflow用户

发布于 2017-09-01 15:18:17

您可以简单地将本地close函数定义为回调:

import math
import multiprocessing 


def findK(N, begin, end):
    for k in range(begin, end):
        if N % k == 0:
            print(N, "=", k, "*", N / k)
            return True
    return False


def mainFun(N, process_num):
    pool = multiprocessing.Pool(process_num)

    def close(result):
        if result:
            print("prime factorization is over")
            pool.terminate()
    for i in range(process_num):
        if i == 0:
            begin = 2
        else:
            begin = int(math.sqrt(N) / process_num * i) + 1
        end = int(math.sqrt(N) / process_num * (i + 1))
        pool.apply_async(findK, args=(N, begin, end), callback=close)
    pool.close()
    pool.join()


if __name__ == "__main__":
    N = 684568031001583853
    process_num = 16
    mainFun(N, process_num)

您还可以使用functool中的partial函数,

import functools

def close_pool(pool, results):
    if result:
        pool.terminate()

def mainFun(N, process_num):
    pool = multiprocessing.Pool(process_num)

    close = funtools.partial(close_pool, pool)
....
票数 6
EN

Stack Overflow用户

发布于 2017-08-30 10:11:11

你需要让poolprompt的环境中结束。一种可能性是将pool移到全局作用域中(尽管这不是真正的最佳实践)。这似乎是可行的:

import math
import multiprocessing 

pool = None

def findK(N,begin,end):
    for k in range(begin,end):
        if N% k == 0:
            print(N,"=" ,k ,"*", N/k)
            return True
    return False


def prompt(result):
    if result:
        print("prime factorization is over")
        pool.terminate()


def mainFun(N,process_num):
    global pool
    pool = multiprocessing.Pool(process_num)
    for i in range(process_num):
        if i ==0 :
            begin =2
        else:
            begin = int(math.sqrt(N)/process_num*i)+1
        end = int(math.sqrt(N)/process_num*(i+1))
        pool.apply_async(findK, args=(N,begin,end) , callback = prompt)    
    pool.close()
    pool.join()    

if __name__ == "__main__":
    N = 684568031001583853
    process_num = 16
    mainFun(N,process_num)
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45950741

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档