前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python—多进程的消息队列

python—多进程的消息队列

作者头像
py3study
发布2020-01-06 10:24:13
1.7K0
发布2020-01-06 10:24:13
举报
文章被收录于专栏:python3python3

消息队列

消息队列是在消息的传输过程中保存消息的容器

消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息

操作系统提供了很多机制来实现进程间的通信,multiprocessing模块提供了Queue和Pipe两种方法来实现

一、使用multiprocessing里面的Queue来实现消息队列

q = Queue() 

q.put(data)  #生产消息

data = q.get() #消费消息

例子:

代码语言:javascript
复制
from multiprocessing import Queue, Process

def write(q):
    for i in ["a","b","c","d"]:
        q.put(i)
        print("put {0} to queue".format(i))
        
def read(q):
    while 1:
        result = q.get()
        print("get {0} from queue".format(result))
        
def main():
    q = Queue()  #定义一个消息队列容器
    pw = Process(target=write,args=(q,)) #定义一个写的进程
    pr = Process(target=read,args=(q,))  #定义一个读的进程
    pw.start()   #启动进程
    pr.start()
    pw.join()    
    pr.terminate()
if __name__ == "__main__":
    main()

运行结果:

put a to queue

put b to queueget a from queue

get b from queue

put c to queue

put d to queue

get c from queue

get d from queue

二、通过Multiprocessing里面的Pipe来实现消息队列

1)Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplux参数为True(默认值),那么这个管道是全双工模式,即conn1和conn2均可收发。duplux为False,conn1负责接收消息,conn2负责发行消息

2)send和recv方法分别是发送和接收消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。

例子:

代码语言:javascript
复制
from multiprocessing import Process,Pipe
import time

def proc1(pipe):
    for i in xrange(1,10):
        pipe.send(i)
        time.sleep(3)
        print("send {0} to pipe".format(i))
        
def proc2(pipe):
    n = 9
    while n>0:
        result = pipe.recv()
        time.sleep(3)
        print("recv {0} from pipe".format(result))
        n -= 1
        
if __name__ == "__main__":
    pipe = Pipe(duplex=False)  #定义并实例化一个管道
    print(type(pipe))
    p1 = Process(target=proc1,args=(pipe[1],))   #pipe[1],管道的右边,表示进入端,发送数据
    p2 = Process(target=proc2,args=(pipe[0],))   #pipe[0],管道的左边,表示出口端,接收数据
    p1.start()
    p2.start()
    
    p1.join()
    p2.join()
    
    pipe[0].close()
    pipe[1].close()

运行结果:

<type 'tuple'>

send 1 to pipe

recv 1 from pipe

send 2 to pipe

recv 2 from pipe

recv 3 from pipe

send 3 to pipe

send 4 to piperecv 4 from pipe

send 5 to pipe

recv 5 from pipe

recv 6 from pipe

send 6 to pipe

send 7 to pipe

recv 7 from pipe

send 8 to pipe

recv 8 from pipe

recv 9 from pipesend 9 to pipe

三、Queue模块

python提供了Queue模块来专门实现消息队列:

Queue对象实现一个fifo队列(其他的还有lifo、priority队列)。queue只有gsize一个构造函数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:

Queue.gsize():返回消息队列的当前空间。返回的值不一定可靠。

Queue.empty():判断消息队列是否为空,返回True或者False。同样不可靠

Queue.full():判断消息是否满

Queue.put(item,block=True,timeout=None):往消息队列中存放数据。block可以控制是否阻塞,timeout控制阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。

Queue.put_nowait(item):相当于put(item,False)

Queue.get(block=True,timeout=None):获取一个消息,其他等同put

以下两个函数用来判断消息对应的任务是否完成:

Queue.task_done():接收消息的线程通过调用这个函来说明消息对应的任务已完成

Queue.join():实际上意味着等到队列为空,再执行别的操作

例子:

代码语言:javascript
复制
from multiprocessing import Process, Pipe, Queue
import time
from threading import Thread

class Proceduer(Thread):
    def __init__(self,queue):
        super(Proceduer,self).__init__() # 超类
        self.queue = queue   #将queue赋给self.queue,便于类中其他函数调用
        
    def run(self):
        try:
            for i in xrange(1,10):
                print("put data is: {0} to queue".format(i))
                self.queue.put(i)
        except Exception as e:
            print("put data error")
            raise e
            
class Consumer_odd(Thread):
    def __init__(self,queue):
        super(Consumer_odd, self).__init__()
        self.queue = queue
        
    def run(self):
        try:
            while self.queue.empty:   #判断消息队列是否为空
                number = self.queue.get()  #取到消息值
                if number%2 != 0:
                    print("get {0} from queue ODD".format(number))
                else:
                    self.queue.put(number)  #将信息放回队列中
            time.sleep(1)
        except Exception as e:
            raise e
            
class Consumer_even(Thread):
    def __init__(self,queue):
        super(Consumer_even,self).__init__()
        self.queue = queue
    def run(self):
        try:
            while self.queue.empty:
                number = self.queue.get()
                if number%2 == 0:
                    print("get {0} from queue Even,thread name is :{1}".format(number,self.getName()))
                else:
                    self.queue.put(number)
                time.sleep(1)
        except Exception as e:
            raise e
            
def main():
    queue = Queue()  #实例化一个消息队列
    p = Proceduer(queue=queue)  #消息队列作为参数赋值给生产者函数,并实例化
    
    p.start()   #启动一个带消息队列的函数
    p.join()    #等待结束
    time.sleep(1)
    
    c1 = Consumer_odd(queue=queue)   #消息队列作为参数赋值给消费者函数,并实例化
    c2 = Consumer_even(queue=queue)    #消息队列作为参数赋值给消费者函数,并实例化
    c1.start()
    c2.start()
    c1.join()
    c2.join()
    print("All threads terminate!")
    
if __name__ == "__main__":
    main()

运行结果:

put data is: 1 to queue

put data is: 2 to queue

put data is: 3 to queue

put data is: 4 to queue

put data is: 5 to queue

put data is: 6 to queue

put data is: 7 to queue

put data is: 8 to queue

put data is: 9 to queue

get 1 from queue ODD

get 3 from queue ODD

get 4 from queue Even,thread name is :Thread-3

get 5 from queue ODD

get 7 from queue ODD

get 9 from queue ODD

get 2 from queue Even,thread name is :Thread-3

get 6 from queue Even,thread name is :Thread-3

get 8 from queue Even,thread name is :Thread-3

例子2:

代码语言:javascript
复制
import Queue

q = Queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print q.get()

运行结果:

0

1

2

3

4

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档