在Python中,进程之间互相隔离,但是在实际的工作中需要两个进程能够进行数据的通信,那么就可以通过队列和管道的方式来实现进程之间的通信,那么就可以使用Queue,在整个Queue的机制里面,把数据可以通过put的方法放到队列里面,再通过get的方法可以获取到,但是需要特别注意的是在一个队列里面,当队列已满在加新的数据就会堵塞,当然当队列里面已经为空的时候,get获取数据也是为堵塞,如下通过具体的代码来演示下Queue的基本使用,具体如下:
from queue import Queue
def func():
queue=Queue(3)
for item in [x for x in range(3)]:
queue.put(item)
for item in range(3):
print(queue.get(item))
if __name__ == '__main__':
func()
如果在上面的代码中,我们把列表推导式里面的代码从range(3)修改到range(4),那么在添加第四位元素的时候就会堵塞,堵塞导致的情况是导致后面的代码也是无法执行,整个队列机制基本和瘫痪没什么区别,也也是同步请求的模式中最大的缺陷之一,当然还有另外一个缺陷就是超时。
可以通过Queue的特性来设计一个生产者消费者的模式,生产者就是往里面获取,而消费者从队列里面获取数据,具体实现的案例代码如下:
#! /usr/bin/env python
# -*- coding:utf-8 -*-
#author:无涯
import time as t
import random
from queue import Queue
from threading import Thread
class Producer(Thread):
def __init__(self,queue):
super().__init__()
self.queue=queue
def run(self):
while True:
a=random.randint(0,10)
b=random.randint(90,100)
print('生产者生产了两个数据:{0},{1}'.format(a,b))
self.queue.put((a,b))
t.sleep(2)
class Consumer(Thread):
def __init__(self,queue):
super().__init__()
self.queue=queue
def run(self):
while True:
#block=True表示,如果队列为空则阻塞在这里,直到队列有数据为止
num_tuple=self.queue.get(block=True)
sum_a_b=sum(num_tuple)
print('消费者消费了一组数:{0}+{1}={2}'.format(num_tuple[0],num_tuple[1],sum_a_b))
t.sleep(random.randint(0,10))
if __name__ == '__main__':
queue=Queue()
producer=Producer(queue=queue)
consumer=Consumer(queue=queue)
producer.start()
consumer.start()
while True:
t.sleep(1)
上面的代码就是一个很典型的生产者消费者的模式,通过Queue来实现把数据放到队列里面,然后从队列里面获取具体的数据,但是它也是存在具体的缺陷的,这种缺陷最典型的就是无法做数据的持久化,这是一点,那么第二点就是我们无法知道生产者知道积累了多少还需要等待消费者消费的数据,而这两点,使用Redis可以很轻松的来解决,同时了Redis也可以实现数据的缓存,以及发布订阅的模式,和高并发的模式下实现队列的等待,某些程度上承担调度的机制,下面通过Redis的方式没,来实现生产者消费者的模式,具体案例代码如下:
#! /usr/bin/env python
# -*- coding:utf-8 -*-
#author:无涯
import time as t
import json
import redis
import random
from threading import Thread
class Producer(Thread):
def __init__(self):
super().__init__()
self.queue=redis.Redis()
def run(self) -> None:
while True:
a=random.randint(0,10)
b=random.randint(90,100)
print('生产者生产了两个数据:{0},{1}'.format(a,b))
self.queue.rpush('生产者:',json.dumps((a,b)))
t.sleep(2)
class Consumer(Thread):
def __init__(self):
super().__init__()
self.queue=redis.Redis()
def run(self):
while True:
num_tuple=self.queue.blpop('producer')
a,b=json.loads(num_tuple[1].decode())
print('消费者消费了一组数:{0}+{1}={2}'.format(a,b,a+b))
t.sleep(random.randint(0,10))
if __name__ == '__main__':
producer=Producer()
consumer=Consumer()
producer.start()
consumer.start()
while True:
t.sleep(1)
这样一方面实现了数据的持久化,另外一方面也是可以如上面说的,能够获取到还未消费的数据,比如通过llen的命令来获取。
感谢您的阅读,后续会持续更新!服务端测试开发直播进行中,想参与的同学也可以扫码咨询。