嗨,我指的是下面的问题,因为它和我想要达到的目标很相似,但是,我遇到了一个错误,我似乎找不到答案,所以我想找些帮助
Combining multithreading and multiprocessing with concurrent.futures
这是我的测试代码:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import numpy as np
from os import cpu_count
from functools import partial
num_list = range(0,1000)
def test(x):
x**2
def multithread(f, lst):
print('Thread running')
with ThreadPoolExecutor() as thread_executor:
thread_executor.map(f, lst)
def multiprocesser(lst, f, n_processors=cpu_count()//2):
chunks = np.array_split(lst, n_processors)
with ProcessPoolExecutor(max_workers=n_processors) as mp_executor:
mp_executor.map(partial(multithread, f), chunks)
if __name__ == '__main__':
multiprocesser(num_list, test)
Process SpawnProcess-31:
Traceback (most recent call last):
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\concurrent\futures\process.py", line 237, in _process_worker
call_item = call_queue.get(block=True)
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\queues.py", line 122, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'multithread' on <module '__main__' (built-in)>
Process SpawnProcess-32:
Traceback (most recent call last):
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\concurrent\futures\process.py", line 237, in _process_worker
call_item = call_queue.get(block=True)
File "C:\Users\Test_user\Anaconda3\envs\test_env\lib\multiprocessing\queues.py", line 122, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'multithread' on <module '__main__' (built-in)>
因此,我没有指定线程数量(没有看到线程池执行器的原因)。很难理解这个错误的真正含义以及我如何修复它。任何帮助都将不胜感激。
发布于 2022-09-13 17:03:05
此错误可能源于多线程()被错误调用的事实。
试试这个:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import numpy as np
from os import cpu_count
from functools import partial
num_list = range(0,1000)
def test(x):
x**2
def multithread(f, lst):
print('Thread running')
with ThreadPoolExecutor() as thread_executor:
thread_executor.map(f, lst)
def multiprocesser(lst, f, n_processors=cpu_count()//2):
chunks = np.array_split(lst, n_processors)
with ProcessPoolExecutor(max_workers=n_processors) as mp_executor:
mp_executor.map(partial(multithread, f), chunks)
if __name__ == '__main__':
multiprocesser(num_list, test)
发布于 2022-09-13 17:54:02
缺失if __name__ == '__main__'
if __name__ == '__main__':
multiprocesser(num_list, test)
意外递归
当您不阻止对multiprocessor()
的调用时,当子进程运行python脚本时,就会有递归。
主模块的安全进口
以下是来自多处理文档的相同类型问题的示例:
确保新的Python解释器可以安全地导入主模块,而不会产生意外的副作用(比如启动新进程)。
例如,使用派生或叉服务器启动方法运行以下模块将导致RuntimeError
失败:
多处理导入过程def foo():print('hello') p= Process(target=foo) p.start()
相反,应该使用if __name__ == '__main__':
来保护程序的“入口点”,如下所示:
从多进程导入过程中,freeze_support,set_start_method def foo():set_start_method(‘hello’)如果__name__ ==‘_main__’:freeze_support() set_start_method('spawn') p= Process(target=foo) p.start() p.start()
https://stackoverflow.com/questions/73706309
复制相似问题