首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >为什么`multiprocessing.Queue.get`这么慢?

为什么`multiprocessing.Queue.get`这么慢?
EN

Stack Overflow用户
提问于 2017-11-02 22:14:16
回答 2查看 12.3K关注 0票数 20

在理解multiprocessing.Queue方面我需要帮助。我面临的问题是,与调用queue.put(...)和队列的缓冲区( deque)相比,从queue.put(...)获得结果的速度要慢得多。

这种泄漏的抽象使我研究了队列的内部结构。它直截了当的源代码只是指向deque执行,这看起来也很简单,以至于我无法用它来解释我所看到的行为。我还读到队列使用管道,但我似乎在源代码中找不到。

我把它简化为一个最小的例子来重现这个问题,我在下面指定了一个可能的输出。

代码语言:javascript
运行
复制
import threading
import multiprocessing
import queue

q = None
def enqueue(item):
    global q
    if q is None:
        q = multiprocessing.Queue()
        process = threading.Thread(target=worker, args=(q,))  # or multiprocessing.Process Doesn't matter
        process.start()
    q.put(item)
    print(f'len putted item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')


def worker(local_queue):
    while True:
        try:
            while True:  # get all items
                item = local_queue.get(block=False)
                print(f'len got item: {len(item)}. qsize: {q.qsize()}. buffer len: {len(q._buffer)}')
        except queue.Empty:
            print('empty')


if __name__ == '__main__':
    for i in range(1, 100000, 1000):
        enqueue(list(range(i)))

输出:

代码语言:javascript
运行
复制
empty
empty
empty
len putted item: 1. qsize: 1. buffer len: 1
len putted item: 1001. qsize: 2. buffer len: 2
len putted item: 2001. qsize: 3. buffer len: 1
len putted item: 3001. qsize: 4. buffer len: 2
len putted item: 4001. qsize: 5. buffer len: 3
len putted item: 5001. qsize: 6. buffer len: 4
len putted item: 6001. qsize: 7. buffer len: 5
len putted item: 7001. qsize: 8. buffer len: 6
len putted item: 8001. qsize: 9. buffer len: 7
len putted item: 9001. qsize: 10. buffer len: 8
len putted item: 10001. qsize: 11. buffer len: 9
len putted item: 11001. qsize: 12. buffer len: 10
len putted item: 12001. qsize: 13. buffer len: 11
len putted item: 13001. qsize: 14. buffer len: 12
len putted item: 14001. qsize: 15. buffer len: 13
len putted item: 15001. qsize: 16. buffer len: 14
len got item: 1. qsize: 15. buffer len: 14
len putted item: 16001. qsize: 16. buffer len: 15
len putted item: 17001. qsize: 17. buffer len: 16
len putted item: 18001. qsize: 18. buffer len: 17
len putted item: 19001. qsize: 19. buffer len: 18
len putted item: 20001. qsize: 20. buffer len: 19
len putted item: 21001. qsize: 21. buffer len: 20
len putted item: 22001. qsize: 22. buffer len: 21
len putted item: 23001. qsize: 23. buffer len: 22
len putted item: 24001. qsize: 24. buffer len: 23
len putted item: 25001. qsize: 25. buffer len: 24
len putted item: 26001. qsize: 26. buffer len: 25
len putted item: 27001. qsize: 27. buffer len: 26
len putted item: 28001. qsize: 28. buffer len: 27
len got item: 1001. qsize: 27. buffer len: 27
empty
len putted item: 29001. qsize: 28. buffer len: 28
empty
empty
empty
len got item: 2001. qsize: 27. buffer len: 27
empty
len putted item: 30001. qsize: 28. buffer len: 28

我希望您注意到有关结果的如下内容:插入元素28001后,工作人员发现队列中没有剩下的元素,而还有几十个元素。由于同步,我可以只获得其中的几个。但它只找到了两个!

这种模式还在继续。

这似乎与我在队列中放置的对象的大小有关。对于小型对象,例如i而不是list(range(i)),则不会出现此问题。但是,讨论的对象的大小仍然是千字节,几乎不足以造成如此严重的延迟(在我的现实世界中,非最小的例子,这容易花了几分钟)。

具体而言,我的问题是:如何在Python中的进程之间共享(不是这样)大量数据?此外,我想知道队列的内部实现中这种惰性是从哪里来的?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-11-13 17:52:32

我也遇到了这个问题。我发送的是大的numpy数组(~300 at ),在mp.queue.get()上它太慢了。

在研究了mp.Queue的python2.7源代码之后,我发现最慢的部分(在类似unix的系统上)是_conn_recvall() in connection.c,但我没有深入研究。

为了解决这个问题,我构建了一个实验性包FMQ

这个项目的灵感来源于multiprocessing.Queue (mp.Queue)的使用。由于管道的速度限制(在类似Unix的系统上),大型数据项的mp.Queue速度很慢。 在mp.Queue处理进程间传输时,FMQ实现了一个窃取线程,一旦任何项可用,该线程就会从mp.Queue中窃取一个项,并将其放入Queue.Queue中。然后,使用者进程可以立即从Queue.Queue中获取数据。 加速是基于这样的假设:生产者和消费者过程都是计算密集型的(因此需要多处理),而且数据很大(例如。>50 227x227图像)。否则,多处理的mp.Queue或线程处理的Queue.Queue就足够了。

fmq.Queue像mp.Queue一样容易使用。

请注意,仍然有一些已知问题,因为这个项目还处于早期阶段。

票数 8
EN

Stack Overflow用户

发布于 2019-05-13 19:47:32

对于未来的读者,您也可以尝试使用:

代码语言:javascript
运行
复制
q = multiprocessing.Manager().Queue()

而不是仅仅

代码语言:javascript
运行
复制
q = multiprocessing.Queue()

我还没有完全提炼和理解这种行为背后的机制,但是我读到的一个来源声称它是关于:

“当将大型项目推送到队列中时,尽管队列的put函数立即返回,但这些项基本上是缓冲的。”

作者继续解释更多关于它和修复方法,但对我来说,添加Manager做了一个简单而干净的技巧。

更新:我相信这个StackOverflow的答案在解释这个问题方面很有帮助。

在接受的答案中提到的FMQ也是Python2专用的,这也是我觉得这个答案有一天可能会帮助更多人的原因之一。

票数 12
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47085458

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档