一、队列queue
队列queue 多应用在多线程场景,多线程访问共享变量。
对于多线程而言,访问共享变量时,队列queue的线程安全的。
因为queue使用了一个线程锁(pthread.Lock()),以及三个条件变量(pthread.condition()),来保证了线程安全。
总结:队列提供了一个安全可靠的共享数据使用方案。
队列内置控制安全的几个参数,非用户使用 | 名称 | 作用 |
---|---|---|
self.mutex | 互斥锁 | 任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有该互斥锁。共有两种操作require获取锁,release释放锁。同时该互斥锁被三个共享变量同时享有,即操作conditiond时的require和release操作也就是操作了该互斥锁。 |
self.not_full | 条件变量队列没满 | 当队列中有元素添加后,会通知notify其他等待添加元素的线程,唤醒等待require互斥锁,或者有线程从队列中取出一个元素后,通知其它线程唤醒以等待require互斥锁。 |
self.not_empty | 条件变量队列不为空 | 线程添加数据到队列中后,会调用self.not_empty.notify()通知其它线程,唤醒等待require互斥锁后,读取队列。 |
self.all_tasks_done | 条件变量队列数据全部处理完 | 消费者线程从队列中get到任务后,任务处理完成,当所有的队列中的任务处理完成后,会使调用queue.join()的线程返回,表示队列中任务以处理完毕。 |
###queue的初始化函数###
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the three conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = _threading.Lock()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = _threading.Condition(self.mutex)
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = _threading.Condition(self.mutex)
# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = _threading.Condition(self.mutex)
self.unfinished_tasks = 0
二、队列数据存取规则:
数据使用方式 | 类名 | 作用 | 示例 |
---|---|---|---|
FIFO先进先出 | Queue(maxsize) | 先进入队列的数据,先取出maxsize:>=0设置队列长度,0为无限长 | q = queue.Queue() |
FILO先进后出 | LifoQueue(maxsize) | 先进入队列的数据,最后取出maxsize:>=0设置队列长度,0为无限长 | q = queue.LifoQueue() |
Priority优先级 | PriorityQueue(maxsize) | 设置优先标志,优先取出高标志位maxsize:>=0设置队列长度,0为无限长 | q = queue.PriorityQueue() |
###例子一:先进先出###
import queue
q = queue.Queue()
for i in range(5):
q.put(i)
for i in range(5):
print(q.get(),end=" ")
#---结果---
0 1 2 3 4
###例子二:后进先出###
import queue
q = queue.LifoQueue()
for i in range(5):
q.put(i)
for i in range(5):
print(q.get(),end=" ")
#---结果---
4 3 2 1 0
###例子三:按优先标志位读取###
#参考其它资料,看到许多讲述优先级队列的实现,但是我觉得直接用元组的方式比较简单粗暴。
import queue
p = queue.PriorityQueue()
p.put((3,"3"))
p.put((1,"1"))
p.put((4,"4"))
p.put((2,"2"))
for i in range(3):
print(p.get())
#---结果:按元组索引0排序---
(1, '1')
(2, '2')
(3, '3')
(4, '4')
###例子四:多元组判断###
import queue
p = queue.PriorityQueue()
p.put((1,4,"a"))
p.put((2,1,"666"))
p.put((1,3,"4"))
p.put((2,2,"2"))
for i in range(3):
print(p.get())
#-----结果:元组对应的序号进行比较,主键是序号0,越往后,优先度越低。-----
(1, 3, '4')
(1, 4, 'a')
(2, 1, '666')
(2, 2, '2')
三、队列的常用方法和属性:
方法和属性 | 作用 | 示例 |
---|---|---|
task_done() | 1、标记之前的一个任务已经完成。2、由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。3、如果当前的join()当前处于阻塞状态,当前的所有元素执行后都会重启(意味着收到加入queue的每一个对象的task_done()调用的信息) | |
join() | 阻塞:等待队列所有任务执行结束。当消费者线程调用task_done(),队列中未完成的计数就会减少,直至计数为0,解除阻塞。 | |
put(item,block,timeout) | 把对象item放入队列:item:对象名称,必填项。block: 默认是True,如果队列满等待。 设置成False,如果队列满报Full异常。timeout:【block为True是生效】 默认是None,如果队列满了等待。 0:不等待,队列满了立即报Full。 正数1~:等待相应秒数,秒数到了,队列还是满的,报错Full。 | |
put_nowait(item) | 向队列里存对象,不等待,如果队列满了,报queue.Full错误 | |
get(block,timeout) | 从队列取出对象,并把对象从队列中删除block: 默认是True,队列为空等待。 可以变更为False,如果队列为空,报Empty错误。timeout:【block为True是生效】 默认是None,队列为空,等待。 0:不等待,队列为空直接报Empty。 正数1~:等待相应秒数,如果依然为空,则报Empty | |
get_nowait() | 从队列里取对象,不等待,如果队列为空,报queue.Empty错误 | |
qsize() | 返回队列长度的近似值。qsize长度不做为get和put方法的操作依据。 | |
empty() | 队列为空返回True不做为get和put方法的操作依据。 | |
full() | 队列满了返回True不做为get和put方法的操作依据。 |
四、队列数据进出规则实例 :
也是一个最简单的生产者消费者例子。
'''例子一:队列基本的进出规则'''
import queue,time,threading,random
def productor(name,s): # 生产者函数,向队列里放产品
time.sleep(s)
print ('服务员{}有时间了'.format(name))
q.put(name)
def customer(): # 消费者函数,从队列里取产品
s = q.get()
print ('服务员{}被叫走了'.format(s))
l = []
q = queue.LifoQueue() # 后进先出,把LifoQueue改成Queue,先进先出。
for i in range(5):
n = random.randint(1,7)
t = threading.Thread(target=productor,args=(i,n)) # 生产线程
l.append(t)
t.start()
for i in l:
i.join()
customer()
#-----运行结果:因为有Random,所以结果不固定,主要观察消费顺序。------
服务员0有时间了
服务员0被叫走了
服务员1有时间了
服务员1被叫走了
服务员4有时间了
服务员3有时间了
服务员2有时间了
服务员2被叫走了
服务员3被叫走了
服务员4被叫走了
参考资料:
http://python.jobbole.com/87592/