首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >python多处理卡住(可能正在读取csv)

python多处理卡住(可能正在读取csv)
EN

Stack Overflow用户
提问于 2018-08-18 02:20:41
回答 1查看 223关注 0票数 0

我正在尝试学习如何使用multiprocessing,但我遇到了一个问题。

我正在尝试运行以下代码:

代码语言:javascript
复制
import multiprocessing as mp
import random
import string

random.seed(123)

# Define an output queue
output = mp.Queue()

# define a example function
def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put(rand_str)

# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]

# Run processes
for p in processes:
    p.start()

# Exit the completed processes
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)

来自here

代码本身运行正常,但是当我用我的函数替换rand_string (读取Pandas数据帧中的一堆csv文件)时,代码永远不会结束。

函数是这样的:

代码语言:javascript
复制
def readMyCSV(clFile):

    aClTable = pd.read_csv(clFile)

    # I do some processing here, but at the end the 
    # function returns a Pandas DataFrame

    return(aClTable)

然后我包装了这个函数,以便它允许在参数中使用Queue

代码语言:javascript
复制
def readMyCSVParWrap(clFile, outputq):
    outputq.put(readMyCSV(clFile))

我使用以下命令构建流程:

代码语言:javascript
复制
processes = [mp.Process(target=readMyCSVParWrap, args=(singleFile,output)) for singleFile in allFiles[:5]]

如果我这样做,代码将永远不会停止运行,并且永远不会打印结果。

如果我只将clFile字符串放在输出队列中,例如:

代码语言:javascript
复制
outputq.put((clFile))

结果被正确地打印出来(只是一个clFiles列表)

当我查看htop时,我看到5个进程正在衍生,但它们不使用任何CPU。

最后,如果我单独运行readMyCSV函数(返回一个Pandas DataFrame),它就能正常工作。

我有什么地方做错了吗?我在一台Jupyter笔记本上运行它,也许这是一个问题?

EN

回答 1

Stack Overflow用户

发布于 2018-08-18 05:01:33

您在进程上的join-statements似乎正在导致死锁。进程不能终止,因为它们等待队列中的项被使用,但在您的代码中,只有在联接之后才会发生这种情况。

连接使用队列的进程

请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目都由“feeder”线程提供给底层管道。(子进程可以调用队列的Queue.cancel_join_thread方法来避免这种行为。)

这意味着,无论何时使用队列,都需要确保所有已放入队列的项最终都会在进程加入之前被移除。否则,您不能确定已将项放入队列的进程是否会终止。还要记住,非守护进程将自动加入。docs

文档进一步建议将行与queue.getjoin互换,或者直接删除join

同样重要的是:

使用if ...protect == ' main ':确保新的Python解释器可以安全地导入主模块,而不会对程序的“入口点”造成意外的副作用(例如启动新进程)。ibid

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51900978

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档