首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何将python多处理和流水线技术结合起来?

如何将python多处理和流水线技术结合起来?
EN

Stack Overflow用户
提问于 2019-04-16 03:04:55
回答 1查看 60关注 0票数 0

你好,我是新来的,我想问一些question.Now,我正在使用python multiprocessing来处理队列中的数据。例如,我有3个函数来计算队列中的数据,在队列中我有3个数据。有没有可能使用多处理的流水线技术来使其更快?

在这段代码中,我尝试使用多进程队列来实现多进程之间的通信,并使用Lock来防止其他进程在之前使用队列中的数据。但是它

from multiprocessing import Process, current_process, cpu_count, Queue, Pool, Lock, Array
from threading import Thread, current_thread
import time
import os

def a(pid, q1, q2, lock):
    while not q1.empty():
        data = q1.get()
        print("data from q1 is %s" % data)
        # for i in range(1000000):
        new_data = data*2
        lock.acquire()
        q2.put(new_data)
        print(pid)
        lock.release()

def b(pid, q2, q3, lock):
    while not q2.empty():
        data = q2.get()
        print("data from q2 is %s" % data)
        # for i in range(1000000):
        lock.acquire()
        new_data = data*3
        q3.put(new_data)
        print(pid)
        lock.release()

def c(pid, q3, q4, lock):
    while not q3.empty():
        data = q3.get()
        print("data from q3 is %s" % data)
        # for i in range(1000000):
        lock.acquire()
        new_data = data*4
        q4.put(new_data)
        print(pid)
        lock.release()

if __name__ == "__main__":

    number = [1,2,3]

    lock = Lock()

    q1 = Queue()
    q2 = Queue()
    q3 = Queue()
    q4 = Queue()

    for data in number:
        q1.put(data)

    p1 = Process(target=a,args=(1, q1, q2, lock))
    p2 = Process(target=b,args=(2, q2, q3, lock))
    p3 = Process(target=c,args=(3, q3, q4, lock))

    p1.start()
    p2.start()
    p3.start()

    p1.join()   
    p2.join()
    p3.join()

    for i in range(q4.qsize()):
        print(q4.get())

我尽我最大的努力解释事情应该如何工作,因为这是我第一次在stackoverflow中问一些问题,而且我的英语水平不好,而且我真的需要帮助。

EN

回答 1

Stack Overflow用户

发布于 2019-04-16 03:36:45

您的问题是您正在使用q.empty()来终止循环。其中一些Queues在开始时将是空的,并且这些Process将过早终止。您需要一种不同的技术来让p2p3进程知道何时退出。

下面是对您的代码的修改,它使用None作为队列中的标志,以在完成时发出信号:

from multiprocessing import Process, current_process, cpu_count, Queue, Pool, Lock, Array
from threading import Thread, current_thread
import time
import os

def a(pid, q1, q2, lock):
    while not q1.empty():
        data = q1.get()
        print("data from q1 is %s" % data)
        # for i in range(1000000):
        new_data = data*2
        lock.acquire()
        q2.put(new_data)
        print(pid)
        lock.release()
    q2.put(None)

def b(pid, q2, q3, lock):
    while True:
        data = q2.get()
        if data is None:
            q3.put(None)
            return
        print("data from q2 is %s" % data)
        # for i in range(1000000):
        lock.acquire()
        new_data = data*3
        q3.put(new_data)
        print(pid)
        lock.release()

def c(pid, q3, q4, lock):
    while True:
        data = q3.get()
        if data is None:
            return
        print("data from q3 is %s" % data)
        # for i in range(1000000):
        lock.acquire()
        new_data = data*4
        q4.put(new_data)
        print(pid)
        lock.release()

if __name__ == "__main__":

    number = [1,2,3]

    lock = Lock()

    q1 = Queue()
    q2 = Queue()
    q3 = Queue()
    q4 = Queue()

    for data in number:
        q1.put(data)

    p1 = Process(target=a,args=(1, q1, q2, lock))
    p2 = Process(target=b,args=(2, q2, q3, lock))
    p3 = Process(target=c,args=(3, q3, q4, lock))

    p1.start()
    p2.start()
    p3.start()

    p1.join()
    p2.join()
    p3.join()

    for i in range(q4.qsize()):
        print(q4.get())

而且,您实际上并不需要Lock。根据documentation的说法

队列模块实现多生产者、多消费者队列。当必须在多个线程之间安全地交换信息时,它在线程编程中特别有用。此模块中的Queue类实现了所有必需的锁定语义。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55695763

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档