Queue in Python

导读

这篇小文中我们会读到以下内容:

1) __all__属性

2) Lock和Condition

3) Queue

阅读时间

约10min

引子

defrun(self):

whileTrue:

try:

self.do_something(item)

exceptException, e:

logger.debug(str(e))

break

在一个线程的run方法中,把工作内容放在while True中,那它是不是永远都不会结束?(其中worklist是一个Queue对象) 答案当然是No.

Python文档中有如下描述:

Queue.get([block[,timeout]])

Remove and return an item from the queue. If optional args block is true and timeout isNone(the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

所以当while True里面的except catch到这个Empty exception时,也就是worklist里面不再有item时,跳出循环,线程随之结束。

显然发生这样的问题,有两个槽点:

1) 从写者的角度,作为一个码农,你写代码都不考虑可读性的吗?你不知道一段代码有70%的时间都是被读的吗?

2) 从读者的角度,作为一个Python码农,你怎么连这样一个基本数据结构的特性都不知道?小鲜肉们拍了哪些剧咋全知道呢?

竟无力反驳,是该多读点书了。那就去看看Queue在Python中到底是怎么实现的吧。

用法

上面的代码中,我们发现有一行非常醒目:

worklist.task_done()

让我们看看文档里怎么说的:

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For eachget()used to fetch a task, a subsequent call totask_done()tells the queue that the processing on the task is complete.

If ajoin()is currently blocking, it will resume when all items have been processed (meaning that atask_done()call was received for every item that had beenput()into the queue).

Raises aValueErrorif called more times than there were items placed in the queue.

只可意会不可言传的翻译(阅读英文无障碍者请大方越过此段):

表明之前出去的那个item已经处理完啦。通常被消费者线程使用。对于每一个调用get()来完成一个任务的线程,紧跟其后的调用task_done()可以告诉队列任务已经完成了。如果目前有一个join()调用被阻塞,那么在所有的item处理完时,它将被恢复(在所有item都task_done了之后)。如果task_done被调用的次数超过了队列中item的个数,就会抛出一个ValueError的异常。

我们还有最精华的使用方法哦(其他异常处理之类的自己去玩):

defworker():

whileTrue:

item=q.get()

do_work(item)

q.task_done()

q=Queue()

foriinrange(num_worker_threads):

t=Thread(target=worker)

t.daemon=True

t.start()

foriteminsource():

q.put(item)

q.join()# block until all tasks are done

准备

在Queue的实现中,会用到Python库中的两个基本线程同步机制:Lock和Condition. 我们先来看看他们俩的作用和用法。

Lock

最基本的同步机制之一,互斥锁。它只有两种状态locked和unlocked. 对应的有两方法:acquire和release分别用来获取锁和释放锁,这里就不多说了。

Condition

这个要说的就多了:

A condition variable is always associated with some kind of lock; this can be passed in or one will be created by default. (Passing one in is useful when several condition variables must share the same lock.)

A condition variable hasacquire()andrelease()methods that call the corresponding methods of the associated lock. It also has await()method, andnotify()andnotifyAll()methods. These three must only be called when the calling thread has acquired the lock, otherwise aRuntimeErroris raised.

Thewait()method releases the lock, and then blocks until it is awakened by anotify()ornotifyAll()call for the same condition variable in another thread. Once awakened, it re-acquires the lock and returns. It is also possible to specify a timeout.

Thenotify()method wakes up one of the threads waiting for the condition variable, if any are waiting. ThenotifyAll()method wakes up all threads waiting for the condition variable.

Note: thenotify()andnotifyAll()methods don’t release the lock; this means that the thread or threads awakened will not return from theirwait()call immediately, but only when the thread that callednotify()ornotifyAll()finally relinquishes ownership of the lock.

Tip: the typical programming style using condition variables uses the lock to synchronize access to some shared state; threads that are interested in a particular change of state callwait()repeatedly until they see the desired state, while threads that modify the state callnotify()ornotifyAll()when they change the state in such a way that it could possibly be a desired state for one of the waiters.

羞涩的翻译:(同样,英文好的小锅锅越过此段)

条件变量总是跟一种锁关联,可以是通过参数传进来的也可以是Condition对象默认创建的。(传入锁在多个条件变量必须使用同一个锁的时候会很有用)

条件变量有acquire()和release()两个方法,这两个方法会调用关联锁的对应方法。当然还有wait(), notify(), notifyAll()这些方法。这三个方法必须在线程获取锁之后调用,否则会抛出一个RuntimeError.

wait()方法会释放锁,并且阻塞直到notify()或者notifyAll()在另一个线程被调用。一旦被唤醒,它重新获取锁并且返回。 它也接受一个指定的timeout时间。

notify()方法唤醒等待条件变量的线程,如果有的话。notifyAll()方法唤醒所有等待条件变量的线程。

注意:notify()和notifyAll()方法不会释放锁,也就是说线程不会从wait()调用立即返回,只有当调用notify()或者notifyAll()的线程释放锁之后,等待的线程才会从wait()方法返回。

Tip(咋翻=.=):我们使用条件变量,通常(典型编程风格让我怎么翻)都是用锁来同步访问某个共享状态。只对共享状态的某个特殊值的线程通过重复调用wait()直到看到它想要的值。而那些修改状态值的线程可以通过调用notify()或者notifyAll()来通知那些等待的线程。

下面又是一个简单粗暴直观的栗子:

# Consume one item

cv.acquire()

whilenotan_item_is_available():

cv.wait()

get_an_available_item()

cv.release()

# Produce one item

cv.acquire()

make_an_item_available()

cv.notify()

cv.release()

上面提到的多个条件变量共享一个锁的情况,我们会在Queue的源代码里面看到。

一本正经的正文部分

准备了这么久,啰嗦了这么多,终于要开始看Queue的源代码了,好激动>-

在Queue.py文件中除了Empty和Full这两个exception类外,还有三个类,分别是Queue, PriorityQueue, LifoQueue. 其中,PriorityQueue和LifoQueue是Queue的子类。它们的关系大致如下:

让我们先来喵一眼Queue的实现再去看子类们又做了哪些不同的事情。

__all__

看到第11行就会发现,可以展开的内容太多了:

__all__ = ['Empty','Full','Queue','PriorityQueue','LifoQueue']

Queue中定义了__all__属性,则在被导入时(from Queue import *),只有在__all__属性中指定的属性、方法和类可以被导入. 如果没有定义,则导入模块内所有的公有属性,方法和类。所以使用__all__可以避免在引用时的命名冲突。

注意两点:

1) __all__属性只影响 from module import * 这种导入方式,对from module import member没有影响,module中的member仍然可以从外部导入

2) 在没有__all__的情况下,_member(protected)和__member(private)不会被导入,但可以通过from module import _member, __member导入

__init__

这是个小插曲,现在~~咳咳~~进入Queue的__init__方法:

def__init__(self, maxsize=):

self.maxsize = maxsize

self._init(maxsize)

self.mutex = _threading.Lock()

self.not_empty = _threading.Condition(self.mutex)

self.not_full = _threading.Condition(self.mutex)

self.all_tasks_done = _threading.Condition(self.mutex)

self.unfinished_tasks =

它接受一个maxsize的参数,如果maxsize

通过self._init(maxsize)初始化了一个主要的成员变量self.queue。这个方法会在子类中被覆盖,根据需求来初始化不同的数据结构。这里的self.queue是一个deque双端队列,它定义在collections里。

初始化一个互斥锁self.mutex

通过互斥锁初始化三个条件变量self.not_empty,self.not_full和self.all_tasks_done

设置一个为完成任务的值self.unfinished_tasks,初始化为0

task_done

接下来就是看上去神秘高冷,实际上朴实无华的task_done了

deftask_done(self):

self.all_tasks_done.acquire()

try:

unfinished =self.unfinished_tasks -1

ifunfinished

ifunfinished

raiseValueError('task_done() called too many times')

self.all_tasks_done.notify_all()

self.unfinished_tasks = unfinished

finally:

self.all_tasks_done.release()

把源码里的注释去掉了,跟在线文档的描述一致。

方法的最开头和最后,获取和释放了all_tasks_done的关联锁。

在临界区,首先获得未完成任务的数量,即当前数目减1.

如果unfinished小于0,完蛋了,出错了,raise exception.

如果unfinished等于0,太好了,任务都做完了,通知所有在等待all_tasks_done的线程,大家可以收工开party了,但是注意这里等待线程不会从wait()里立即返回,直到task_done最后释放all_tasks_done的关联锁。

如果unfinished大于0,还有任务没完成,加油干吧骚年,把unfinished值正式赋值给self.unfinished_tasks.

join

defjoin(self):

self.all_tasks_done.acquire()

try:

whileself.unfinished_tasks:

self.all_tasks_done.wait()

finally:

self.all_tasks_done.release()

join是task_done的好基友,就他俩使用all_tasks_done.

同样地,在join的开头和结尾,会获取和释放all_tasks_done的关联锁。

然后在while里面不断去check unfinished_tasks是否为0,如果不为0,就调用all_tasks_done的wait(). 注意wait()会释放锁,在task_done调用notifyAll并且释放锁之后,又重新获取锁,继续while循环,check是否已经做完所有的任务。最后在while条件为假,也就是所有任务都做完的时候,跳出循环,释放锁。唤醒调用join的线程。

qsize

defqsize(self):

"""Return the approximate size of the queue (not reliable!)."""

n =self._qsize()

returnn

defempty(self):

"""Return True if the queue is empty, False otherwise (not reliable!)."""

n =notself._qsize()

returnn

deffull(self):

"""Return True if the queue is full, False otherwise (not reliable!)."""

n =

returnn

然后有三个靠不住的小方法,自己都高调说明自己是不靠谱的了。在每个方法里面,访问_qsize时都会用互斥锁把自己锁住,增加安全感。

可以在full方法里看到那行n =

put

下面就要到Queue里面最重要的两个方法put和get啦

defput(self, item, block=True, timeout=None):

self.not_full.acquire()

try:

ifself.maxsize >:

ifnotblock:

ifself._qsize() ==self.maxsize:

raiseFull

eliftimeoutisNone:

whileself._qsize() ==self.maxsize:

self.not_full.wait()

eliftimeout

raiseValueError("'timeout' must be a non-negative number")

else:

endtime = _time() + timeout

whileself._qsize() ==self.maxsize:

remaining = endtime - _time()

ifremaining

raiseFull

self.not_full.wait(remaining)

self._put(item)

self.unfinished_tasks +=1

self.not_empty.notify()

finally:

self.not_full.release()

除了item以外,put还有block和timeout两个参数,默认情况下,调用的线程会等待直到有空位并且没有timeout时间。

首先获取not_full的关联锁,当然这个锁跟all_tasks_done还有not_empty是同一个,就是说这些方法都是互斥的。

然后给个大条件,maxsize大于0的时候,小于等于0的时候没必要等待,直接放进去。

如果是非阻塞的调用,就直接判断是不是满了,满了raise一个exception,不满直接put.

如果是阻塞调用,没有指定timeout时间,那就无限等待下去,直到有位子空出来。

如果timeout小于0,显然错误。

如果是阻塞调用又指定了正确的timeout时间,那我就开始计时啦!在while循环里,每次从wait中被唤醒,都算一下剩余时间,如果超时了,就raise一个Full exception。如果没超时就继续等待直到有空位出来。

等放进了item之后,就调用not_empty的notify()告诉等待的线程,现在已经有一个东东在队列里啦,你可以来处理一下了。

get

defget(self, block=True, timeout=None):

self.not_empty.acquire()

try:

ifnotblock:

ifnotself._qsize():

raiseEmpty

eliftimeoutisNone:

whilenotself._qsize():

self.not_empty.wait()

eliftimeout

raiseValueError("'timeout' must be a non-negative number")

else:

endtime = _time() + timeout

whilenotself._qsize():

remaining = endtime - _time()

ifremaining

raiseEmpty

self.not_empty.wait(remaining)

item =self._get()

self.not_full.notify()

returnitem

finally:

self.not_empty.release()

put()的好兄弟get()来了~~blcok和timeout参数跟put()一样。

在get()方法首尾,获取和释放not_empty的关联锁。

在try块中,如果是非block调用,qsize为0就直接抛出Empty异常。如果qsize不为0,就直接去queue里面_get(),然后调用not_full的notify(), 通知等待put的线程说我这有空位了,你来吧。

如果是block调用,并且没有设置timeout时间,就在while循环中调用not_empty的wait()方法,释放锁然后挂起,直到收到not_empty的notify并且重新acquire到锁才进入下一次循环。(这里只有put方法会在放入新的item后会调用not_empty的notify方法)

如果设置了timeout值但不合法,就抛ValueError异常。

如果block调用timeout值也合法,那就像put()方法一样,如果等到了item,则get,如果没等到,就抛Empty异常。

put_nowait & get_nowait

defput_nowait(self, item):

returnself.put(item,False)

def get_nowait(self):

returnself.get(False)

还有这两个方法,非阻塞调用,就是简单地给put()和get()的block置False即可

_init, _qsize, _put, _get

下面四个protected方法,是在子类PriorityQueue和LifoQueue中会重写的方法

# Initialize the queue representation

def_init(self, maxsize):

self.queue = deque()

def_qsize(self, len=len):

returnlen(self.queue)

# Put a new item in the queue

def_put(self, item):

self.queue.append(item)

# Get an item from the queue

def_get(self):

这里可以看到_init()方法里面,初始化self.queue的数据结构是deque,双端队列。在下面的_put()和_get()方法里调用的就是deque的append和popleft。记住这个双端队列,我们后面会深挖它。

到这里,Queue中的所有方法都看完了,揭开庐山真面目之后,发现它并不高冷,只是看上去高冷罢了。那它的两个子类PriorityQueue和LifoQueue更不在话下了。

PriorityQueue

def_init(self, maxsize):

self.queue = []

def_qsize(self, len=len):

returnlen(self.queue)

def_put(self, item, heappush=heapq.heappush):

heappush(self.queue, item)

def_get(self, heappop=heapq.heappop):

returnheappop(self.queue)

在_init()方法中,self.queue被初始化一个普通list。在_qsize(),_put(), _get()中,都有一个Queue中没有的参数,用来给出调用方法。在父类的方法中被调用时,都使用了默认值。_put和_get的heappush和heappop的默认值是heapq中对应的heappush和heappop。heapq是一个最小堆队列,所以每次heappush都会把item放到合适的位置,每次heappop都能拿到堆中的最小值。所以在PriorityQueue的注释中可以看到,它是一个lowest first的优先级队列。也记住这个heapq,我们后面也会深挖它。

LifoQueue

def_init(self, maxsize):

self.queue = []

def_qsize(self, len=len):

returnlen(self.queue)

def_put(self, item):

self.queue.append(item)

def_get(self):

顾名思义,这是一个后进先出的队列。默认情况下,队列是FIFO的数据结构,栈是LIFO。所以这个类完全可以当做stack来用,在源码的注释中也可获知一二:

# Override these methods to implement other queue organizations

# (e.g. stack or priority queue).

# These will only be called with appropriate locks held

在LifoQueue的_init()中,self.queue也被初始化为一个普通的list。但跟PriorityQueue不同的是,它的_put和_get都使用list的append和pop。简单实现了一个stack。

好啦,到此处,就已经把Queue.py里面的内容都看完了,回顾一下,我们遇到了两个跟Queue及其子类有依赖关系的类:deque和heapq。如果我们的内容再多一点,也许你就要不耐烦了。所以在后面一篇小文中会介绍它们俩。如果你看到此处已经超过了10min,说明你是真爱。Love you and see you~~

这里是馒头和奶油蛋糕的无逻辑小文栈,关注我吧!

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180402G1BAQE00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券