Python3.7 pool.apply_async似乎对我不起作用?

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (1)
  • 关注 (0)
  • 查看 (2074)

所以我试图同时运行6个进程,作为测试(我有一个128核CPU,所以目标是并行127个进程),在每个进程中我将运行256个线程来完成一些任务。

我认为我接到电话是pool.apply_async错误的,因此一旦电话通过就似乎没有任何事情发生。我遵循https://docs.python.org/3/library/multiprocessing.html#using-a-pool-of-workers中显示的示例,我不明白我犯了什么错误。

这是执行异步调用的代码段

batch_no = 0
ra = []
for worker_ip in worker_ip_list:
    logg.log("debug","attempting to do async process invocation for workload batch ="+str(batch_no))
    r = worker_pool.apply_async(self.run_worker_for_multi_task,(target_function,worker_ip,threads_per_worker,))
    ra.append(r)
    try:
        logg.log("debug","work pool async call ready status ="+str(r.successful()))
    except Exception:
        logg.log_stacktrace()
    batch_no = batch_no + 1

开头self.run_worker_for_multi_task有一些日志语句,但我没有看到它们中的任何一个被执行。

这是方法的开始。

    def run_worker_for_multi_task(self,tf,worker_ip_list,thread_batch_size):
        l = self.logger.log
        worker_output = Queue()
        l("info","started worker process with PID="+str(os.getpid()))
        l("info","thread batch size is = "+str(thread_batch_size))
        l("debug","creating thread batches...")
...

但这是我得到的输出。

Thu Oct 18 15:38:22 2018 -- INFO -- [directory watcher] directory watching running a scan cycle.
Thu Oct 18 15:38:23 2018 -- DEBUG -- Process Tracker Initialized
Thu Oct 18 15:38:23 2018 -- DEBUG -- [process tracker] {'app_pid': 36935}
Thu Oct 18 15:38:23 2018 -- INFO -- number of workers set to 6
Thu Oct 18 15:38:23 2018 -- INFO -- number of threads per worker set to 256
Thu Oct 18 15:38:23 2018 -- DEBUG -- workload size is - 134208
Thu Oct 18 15:38:23 2018 -- DEBUG -- workload size per worker is going to be - 22368
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =0
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb7bda0> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =1
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb7bdd8> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =2
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb7be48> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =3
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb8b6a0> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =4
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb8b710> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- attempting to do async process invocation for workload batch =5
Thu Oct 18 15:38:23 2018 -- STACK -- Traceback (most recent call last):\n  File "/Users/anupam/PycharmProjects/MultimediaLibrary/core/TaskTracker.py", line 63, in multi_task\n    logg.log("debug","work pool async call ready status ="+str(r.successful()))\n  File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 644, in successful\n    raise ValueError("{0!r} not ready".format(self))\nValueError: <multiprocessing.pool.ApplyResult object at 0x10cb8b7b8> not ready\n
Thu Oct 18 15:38:23 2018 -- DEBUG -- all workers completed. shared output data returned by all workers is --
Thu Oct 18 15:38:23 2018 -- DEBUG -- {}

同时我ps -ef | grep -i python在连续循环中运行命令但是在运行代码时我没有看到python进程有任何增加。

而且我知道这self.run_worker_for_multi_task很有效,因为当我通过调用调用它时,我能够从中获得预期的行为Process.start()。问题Process.start()是它阻止并阻止其他进程启动直到进程加入。

即,下面的代码不会运行它并行的进程列表。它在第一次process.start()通话时被阻止

logg.log("debug","creating workers...")
for worker_ip in worker_ip_list:
    worker_inst = Process(target=self.__run_worker_for_multi_task,args=(target_function,worker_ip,q,threads_per_worker,))
    worker_list.append(worker_inst)
logg.log("debug","workers created.")
logg.log("debug","starting workers.")
for worker_inst in worker_list:
    worker_inst.start()
    logg.log("info","starting worker "+str(worker_inst) +" with pid="+str(worker_inst.pid))
logg.log("debug","workers are started")
logg.log("debug","waiting for all workers to complete their tasks")
for worker_inst in worker_list:
    worker_inst.join()

我在这里想念的是什么?为什么我看不到调用六个进程并查看目标的日志语句?如何在多个进程中并行运行该函数?

提问于
用户回答回答于

所以我仍然不知道游泳池的Async会发生什么。但是,我想为什么进程被阻止而且Process.start()无法正常工作。目标必须在公共范围内,否则上下文不能与其他进程共享。因此,将我的目标更改为公共方法就可以了。

热门问答

cos.sliceUploadFile支持断点续传吗?

如果用的是 cos-js-sdk,那么 cos.restartTask 是会断点续传的,用法没有问题。 PS: sdk 使用可以参考 demo.js https://github.com/tencentyun/cos-js-sdk-v5/blob/master/demo/demo...... 展开详请

使用独立H5接入人脸核身,在微信浏览器拍摄视频按钮无法点击?

旺仔小小鹿

社区 · 运营 (已认证)

Less is more
推荐

使用iframe会有问题 ,微信有限制,不允许使用iframe调用jsapi摄像头 ,微信里,不能用iframe

ios应该都不行的,安卓需要看是什么浏览器。

云服务器中ping不可达,请教一下如何恢复?

推荐已采纳
本地主机 ping 不通实例可能由以下问题导致: 目标服务器的设置不正确 域名没有正确解析 链路故障 在确保本地网络正常的前提下(即您可以正常 ping 通其他网站),可根据以下操作进行排查: 检查实例是否配置公网 IP 检查安全组设置 检查系统设置 检查域名是否备案 检查域名解...... 展开详请

为什么加固之后生成四个文件?

腾讯云@移动安全

腾讯 · 移动开发工程师 (已认证)

腾讯云移动安全前端开发
推荐

选择最后一个_legu_aligned_signed.apk 文件,这个是加固并已重签名的文件。

COS Javascript SDK 为何没有 getService 方法?

因为 getService 请求的是 service.cos.myqcloud.com 或 cos.<Region>.myqcloud.com 域名,前端直接请求会导致跨域问题。 前端 js sdk 直接请求 bucket/object 相关的接口,虽然也会跨域,但你可以在 你...... 展开详请

iot设备通过mqtt协议连接,没有办法设置clientid?

DylanRichard

腾讯 · 产品经理 (已认证)

万物互联的时代,欢迎来到IoT的世界
推荐

物联网接入层有设备互踢的逻辑,如果是用同一个设备 ID 在不同地方登录,会导致其中一方被另一方踢下线。因此发现设备一直上下线时,需要确认是否有不同的人或者多线程在使用同一个设备 ID 执行登录操作。

所属标签

扫码关注云+社区

领取腾讯云代金券