首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Python multiprocessing.Queue显然丢失了数据

Python multiprocessing.Queue显然丢失了数据
EN

Stack Overflow用户
提问于 2022-11-21 14:35:39
回答 1查看 30关注 0票数 -1

我试图利用多重处理来加快程序的速度。为此,我需要在尽可能多的进程之间并行化一个任务,比如说n。因为我不想创建任何我绝对必须要创建的进程,所以我创建了n-1新进程,启动它们,然后在当前进程上运行最后的工作,最后将所有的进程连接在一起。所有这些都通过队列进行通信。每个进程都通过参数传递其“工作份额”,因此每个进程只需要在它们完成时将结果放到队列中(这些结果中的每一个可以有6600个字母长)。

代码语言:javascript
运行
复制
def play(chosen_word):
    l=[chosen_word, chosen_word]
    return l

def partial_test(id, words, queue):
    print(f'Process {id} started and allocated {len(words)} words.')
    guesses=[]
    for word in words:
        guesses.append(play(word))

    print(f"Process {id} has finished ALL WORDS.") #debugging only
    queue.put((id, guesses))
    print(f'Process {id} added results to queue')
    queue.cancel_join_thread()
    print(f'Process {id} closed the queue and exited. Queue has aproximately {queue.qsize()} elements')


def full_test():
    #do stuff

    #create Queue for results
    queue=Queue()

    #initialize auxiliary processes
    processes=[Process(target=partial_test, args=(x, word_list[x*words_per_process:(x+1)*words_per_process], queue)) for x in range(process_count-1)]

    #start processes
    for process in processes:
        process.start()
    #run last process on the current thread
    partial_test(process_count-1, word_list[(process_count-1)*words_per_process:], queue)

    #join processes
    i=0
    for process in processes:
        process.join()
        print(f'Joined process {i} with main thread.')
        i+=1

    print("All processes finished!")

    #get results (they need to be in order) 
    results=[[] for _ in range(process_count)]
    i=0
    while not queue.empty():
        res=queue.get()
        results[res[0]]=res[1].copy()
        i+=1
    print(f"Got {i} results!")

    #do stuff with results

当我试图读取队列中的数据时,就会出现问题。每个进程都报告它将数据放在队列中,因此在最后一个进程加入之前,它上有n个元素。但是,当我试图获取它们并将它们放到结果列表中时,我只提取一个包含n进程(在主线程上运行的进程)记录的数据的单个元素。

我最初没有使用queue.cancel_join_thread(),但发现为了防止进程加入,即使在完成执行之后,它们也会等待缓冲区真正写入队列,在数据量很大的情况下,在调用queue.get()方法之前是不会这样做的。但是,由于我只有在所有进程完成后才能得到数据,所以永远不会调用它,程序也会被卡住。我认为这可能与此有关(虽然我不明白为什么它不会影响第n个进程),但我没有办法将数据从缓冲区强制刷新到队列中。

我还确信,代码的这一部分可能依赖于返回正确的数据,因为我已经在单个进程版本中测试了相同数据上的所有内容。

编辑:游戏功能只是一个替身,但是对于这个帖子的所有意图和目的来说,都相当于最初的一个,因为使用这个功能会产生完全相同的问题。发布原始代码以及它的所有依赖项将意味着发布我的大部分代码,这将使我们很难集中精力解决问题。

EN

回答 1

Stack Overflow用户

发布于 2022-11-21 22:08:21

所以,你的问题是:

我最初没有使用queue.cancel_join_thread(),但发现为了防止进程加入,即使在完成执行之后,它们也会等待缓冲区真正写入队列,在数据量很大的情况下,在调用queue.get()方法之前是不会这样做的。但是,由于我只有在所有进程完成后才能得到数据,所以永远不会调用它,程序也会被卡住。我认为这可能与此有关(虽然我不明白为什么它不会影响第n个进程),但我没有办法将数据从缓冲区强制刷新到队列中。

使用multiprocessing.Queue,读者需要与写作者并行阅读。队列是在有限大小的OS级别管道之上实现的,如果没有读取,编写器将很快填充管道的缓冲区并无法写入。你所犯的错误就是因为这个。你没有找到一个“强制冲洗”选项,因为没有地方可以冲洗-管道已经满了,没有人正在阅读来清除它。

cancel_join_thread不能解决这个问题。通过调用cancel_join_thread,你可以告诉,“不,我完全不介意你扔掉我的数据”,所以工作程序很高兴地退出而不完成对管道的写入,然后丢弃数据。只有当您不关心丢失的数据时,才应该使用cancel_join_thread的文档。

您可以尝试使用带有manager = multiprocessing.Manager()queue = manager.Queue()的托管队列,因为IIRC托管队列没有相同的限制,但它们避免此限制的方式涉及创建额外的服务器进程来管理队列,而设计程序的全部目的就是避免创建额外的进程。另外,我认为基于管理器的队列有额外的进程间通信开销。

我建议您只使用multiprocessing.Queue的设计方式--与写并行地从它读取。与其将主进程用作额外的工作人员,不如让它在开始工作后立即开始读取数据,并且只在读取完所有数据后才加入工作人员。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74520564

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档