我已经编写了一个python函数,它使用提供的目录模式遍历文件系统,并在每个级别提供可选的“操作”。然后我尝试了多线程,因为一些卷在网络共享上,并且我想最大限度地减少IO阻塞。我从使用多进程Pool类开始,因为这是最方便的……(说真的,没有线程的Pool类?)我的函数尽可能地解开提供的FS模式,并将新返回的路径提交到池中,直到没有新路径返回为止。当我直接使用函数和类时,我得到了很好的效果,但现在我试图从另一个类中使用这个函数,但我的程序似乎挂起了。为了简化,我使用线程而不是进程重写了函数,甚至编写了一个简单的ThreadPool类……同样的问题。以下是代码的一个非常简化的版本,它仍然显示出相同的问题:
file test1.py:
------------------------------------------------
import os
import glob
from multiprocessing import Pool
def mapGlob(pool,paths,pattern):
results = []
paths = [os.path.join(p,pattern) for p in paths]
for result in pool.map(glob.glob,paths):
results += result
return results
def findAllMyPaths():
pool = Pool(10)
paths = ['/Volumes']
follow = ['**','ptid_*','expid_*','slkid_*']
for pattern in follow:
paths = mapGlob(pool,paths,pattern)
return paths
file test2.py:
----------------------------------------------------------------------------
from test1 import findAllMyPaths
allmypaths = findAllMyPaths()现在如果我打电话给
>>>from test1 import findAllMyPaths
>>>findAllMyPaths()
>>>...long list of all the paths这可以很好地工作,但如果尝试:
>>>from test2 import allmypathspython永远挂起。动作函数被调用(在本例中为glob),但它们似乎永远不会返回...我需要帮助请..。当它正常工作时,并行版本的运行速度要快得多(6-20倍的速度取决于FS树中每个点上映射的‘action’),所以我希望能够使用它。
另外,如果我将映射函数更改为非并行版本:
def mapGlob(pool,paths,pattern):
results = []
paths = [os.path.join(p,pattern) for p in paths]
for path in paths:
results += glob.glob(path)
return results一切运行正常。
编辑:
我打开了多进程中的调试,看看这是否能进一步帮助我。在它工作的情况下,我得到:
[DEBUG/MainProcess] created semlock with handle 5
[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 9
[DEBUG/MainProcess] created semlock with handle 10
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()
[DEBUG/MainProcess] closing pool
[SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x34af918>, <multiprocessing.queues.SimpleQueue object at 0x3494950>, <multiprocessing.queues.SimpleQueue object at 0x34a61b0>, [<Process(PoolWorker-1, started daemon)>, <Process(PoolWorker-2, started daemon)>, <Process(PoolWorker-3, started daemon)>, <Process(PoolWorker-4, started daemon)>, <Process(PoolWorker-5, started daemon)>, <Process(PoolWorker-6, started daemon)>, <Process(PoolWorker-7, started daemon)>, <Process(PoolWorker-8, started daemon)>, <Process(PoolWorker-9, started daemon)>, <Process(PoolWorker-10, started daemon)>], <Thread(Thread-1, started daemon -1341648896)>, <Thread(Thread-2, started daemon -1341116416)>, {}) and kwargs {}
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/PoolWorker-2] worker got sentinel -- exiting
[DEBUG/PoolWorker-1] worker got sentinel -- exiting
[INFO/PoolWorker-2] process shutting down
[DEBUG/PoolWorker-7] worker got sentinel -- exiting
[INFO/PoolWorker-1] process shutting down
[INFO/PoolWorker-7] process shutting down
[DEBUG/PoolWorker-7] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-1] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-7] running the remaining "atexit" finalizers
[INFO/PoolWorker-7] process exiting with exitcode 0
[DEBUG/PoolWorker-1] running the remaining "atexit" finalizers
[INFO/PoolWorker-1] process exiting with exitcode 0
[DEBUG/PoolWorker-5] worker got sentinel -- exiting
[DEBUG/PoolWorker-2] running all "atexit" finalizers with priority >= 0
[INFO/PoolWorker-5] process shutting down
[DEBUG/PoolWorker-5] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-2] running the remaining "atexit" finalizers
[DEBUG/PoolWorker-5] running the remaining "atexit" finalizers
[INFO/PoolWorker-2] process exiting with exitcode 0
[INFO/PoolWorker-5] process exiting with exitcode 0
[DEBUG/PoolWorker-6] worker got sentinel -- exiting
[INFO/PoolWorker-6] process shutting down
[DEBUG/PoolWorker-6] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-6] running the remaining "atexit" finalizers
[INFO/PoolWorker-6] process exiting with exitcode 0
[DEBUG/PoolWorker-4] worker got sentinel -- exiting
[DEBUG/PoolWorker-9] worker got sentinel -- exiting
[INFO/PoolWorker-9] process shutting down
[DEBUG/PoolWorker-9] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-9] running the remaining "atexit" finalizers
[INFO/PoolWorker-9] process exiting with exitcode 0
[INFO/PoolWorker-4] process shutting down
[DEBUG/PoolWorker-4] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-4] running the remaining "atexit" finalizers
[INFO/PoolWorker-4] process exiting with exitcode 0
[DEBUG/PoolWorker-10] worker got sentinel -- exiting
[INFO/PoolWorker-10] process shutting down
[DEBUG/PoolWorker-10] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-10] running the remaining "atexit" finalizers
[INFO/PoolWorker-10] process exiting with exitcode 0
[DEBUG/PoolWorker-8] worker got sentinel -- exiting
[INFO/PoolWorker-8] process shutting down
[DEBUG/PoolWorker-8] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-8] running the remaining "atexit" finalizers
[INFO/PoolWorker-8] process exiting with exitcode 0
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[INFO/PoolWorker-3] process shutting down
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[INFO/PoolWorker-3] process exiting with exitcode 0
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers当它没有的时候,我得到的就是:
[DEBUG/MainProcess] created semlock with handle 6
[DEBUG/MainProcess] created semlock with handle 7
[DEBUG/MainProcess] created semlock with handle 10
[DEBUG/MainProcess] created semlock with handle 11
[INFO/PoolWorker-1] child process calling self.run()
[INFO/PoolWorker-2] child process calling self.run()
[INFO/PoolWorker-3] child process calling self.run()
[INFO/PoolWorker-8] child process calling self.run()
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-9] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[INFO/PoolWorker-7] child process calling self.run()
[INFO/PoolWorker-10] child process calling self.run()发布于 2011-01-28 04:39:57
这不是一个完整的解决方案,但我找到了一种方法,可以使代码以任何一种形式工作:从解释器或作为正在运行的脚本中的代码。我认为这个问题与多进程文档中的以下说明有关:
此包中的功能要求main方法可由子级导入。这在编程指南中有所涉及,但在这里值得指出。这意味着一些示例,例如multiprocessing.Pool示例将不能在交互式解释器中工作。
我不确定为什么存在这个限制,为什么我有时仍然可以使用交互式解释器中的池,有时不可以,但是好吧……
为了解决这个问题,我在任何可能使用多进程的模块中执行以下操作:
import __main__
__SHOULD_MULTITHREAD__ = False
if hasattr(__main__,'__file__'):
__SHOULD_MULTITHREAD__ = True然后,该模块中的其余代码可以检查此标志,以确定它是否应该使用池,或者只执行而不进行并行处理。这样做,我仍然可以在交互式解释器的模块中使用和测试并行化函数,它们只是运行得更慢。
发布于 2011-01-26 06:18:01
如果我没有错,那么test2.py不应该是这样的吗?
from test1 import findAllMyPaths
allmypaths = findAllMyPaths然后
来自test2的
导入allmypaths()
https://stackoverflow.com/questions/4798912
复制相似问题