首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Python3.7 pool.apply_async似乎不适合我

Python3.7 pool.apply_async似乎不适合我
EN

Stack Overflow用户
提问于 2018-10-19 04:49:44
回答 1查看 692关注 0票数 0

因此,我尝试同时运行6个进程,作为测试(我有128个核心CPU,因此目标是并行127个进程),并且在每个进程中,我将运行256个线程来执行某些任务。

我想我把调用pool.apply_async弄错了,因此一旦调用成功,似乎什么也不会发生。我正在遵循https://docs.python.org/3/library/multiprocessing.html#using-a-pool-of-workers中显示的示例,但我不明白我犯了什么错误。

这是执行异步调用的代码片段

代码语言:javascript
复制
batch_no = 0
ra = []
for worker_ip in worker_ip_list:
    logg.log("debug","attempting to do async process invocation for workload batch ="+str(batch_no))
    r = worker_pool.apply_async(self.run_worker_for_multi_task,(target_function,worker_ip,threads_per_worker,))
    ra.append(r)
    try:
        logg.log("debug","work pool async call ready status ="+str(r.successful()))
    except Exception:
        logg.log_stacktrace()
    batch_no = batch_no + 1

self.run_worker_for_multi_task的开头有一些日志语句,但我看不到它们中的任何一个被执行。

下面是该方法的开始部分。

代码语言:javascript
复制
    def run_worker_for_multi_task(self,tf,worker_ip_list,thread_batch_size):
        l = self.logger.log
        worker_output = Queue()
        l("info","started worker process with PID="+str(os.getpid()))
        l("info","thread batch size is = "+str(thread_batch_size))
        l("debug","creating thread batches...")
...

但这是我得到的输出。

代码语言:javascript
复制
Thu Oct 18 15:38:22 2018 -- INFO -- [directory watcher] directory watching running a scan cycle.
Thu Oct 18 15:38:23 2018 -- DEBUG -- Process Tracker Initialized
Thu Oct 18 15:38:23 2018 -- DEBUG -- [process tracker] {'app_pid': 36935}
Thu Oct 18 15:38:23 2018 -- INFO -- number of workers set to 6
Thu Oct 18 15:38:23 2018 -- INFO -- number of threads per worker set to 256
Thu Oct 18 15:38:23 2018 -- DEBUG -- workload size is - 134208
Thu Oct 18 15:38:23 2018 -- DEBUG -- workload size per worker is going to be - 22368
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =0
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb7bda0> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =1
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb7bdd8> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =2
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb7be48> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =3
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb8b6a0> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =4
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb8b710> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =5
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb8b7b8> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- all workers completed. shared output data returned by all workers is --
Thu Oct 18 15:38:23 2018 -- DEBUG -- {}

同时,我在一个连续的循环中运行ps -ef | grep -i python命令,但是在运行代码时,我没有看到任何增加的python进程。

我知道self.run_worker_for_multi_task工作得很好,因为当我用Process.start() call调用它时,我能够从中获得预期的行为。Process.start()的问题在于它会阻塞并阻止其他进程启动,直到进程加入为止。

也就是说,下面这段代码并没有运行与之并行的进程列表。它在第一次process.start()调用时被阻塞

代码语言:javascript
复制
logg.log("debug","creating workers...")
for worker_ip in worker_ip_list:
    worker_inst = Process(target=self.__run_worker_for_multi_task,args=(target_function,worker_ip,q,threads_per_worker,))
    worker_list.append(worker_inst)
logg.log("debug","workers created.")
logg.log("debug","starting workers.")
for worker_inst in worker_list:
    worker_inst.start()
    logg.log("info","starting worker "+str(worker_inst) +" with pid="+str(worker_inst.pid))
logg.log("debug","workers are started")
logg.log("debug","waiting for all workers to complete their tasks")
for worker_inst in worker_list:
    worker_inst.join()

这里我漏掉了什么?为什么我看不到6个进程被调用,并看到来自目标的日志语句?如何在多个进程中并行运行函数?

EN

回答 1

Stack Overflow用户

发布于 2018-10-19 06:41:59

所以我还是不知道台球的异步是怎么回事。但是,我明白了为什么进程被阻塞,Process.start()无法工作。目标必须在公共作用域中,否则上下文无法与其他进程共享。因此,将目标更改为公共方法确实起到了作用。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52882344

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档