上一篇文章中,我们介绍了线程同步与 Python 中的锁机制。 Python 线程同步(一) — 竞争条件与线程锁
但锁机制只能解决最为简单和通用的线程同步场景,本文我们就来详细介绍更为复杂的场景下需要使用哪些新的线程同步工具 — 条件对象。
我们此前解析过 Java 中的条件对象的源码。 锁的等待与唤醒 — ConditionObject 源码解析
理解了 java 中的条件对象的执行原理,我们就会发现 python 中的条件对象与 java 中的条件对象实际上完全是一个东西。 有这样一个场景,订单的状态在不断变更,线程1关心订单支付成功状态并在此后做一些事,线程2关心订单发起退款状态并在此后做一些事,而业务线程则在业务执行过程中不断变更订单状态,而当订单一创建,我们需要让线程1、线程2阻塞等待,而只有到了预期状态被成功更新,才唤醒,而状态本身是一个竞争条件,其变更与查询都需要加锁。 看上去上面的场景非常复杂,但使用条件对象去处理就会非常方便。
条件对象总是保存有一个锁的引用,创建条件对象时可以作为参数传入,必须是 threading.Lock 或者 threading.RLock,如果没有传入,则会创建默认的 threading.RLock。 条件对象也有着加锁与解锁方法,条件对象只负责调用对象锁成员的对应方法。 加锁后,一旦调用 wait 方法,则自动释放锁并阻塞等待,此时,另一个等待锁的线程开始执行,直到该线程调用 notify 或 notify_all 方法并释放锁,等待着的线程才能继续执行。
acquire(*args) release()
如上文所述,加锁与解锁实际上是直接调用条件对象所持有的锁实例的对应方法。
wait(timeout=None)
阻塞等待直到被唤醒或超时。 必须在线程获取到锁之后再调用该方法,否则会抛出 RuntimeError。 这个方法释放锁,然后阻塞,直到在另外一个线程中调用同一个条件变量的 notify() 或 notify_all() 唤醒它,或者直到可选的超时发生。 如果条件对象持有的是 RLock,那么他不会调用 release 方法释放锁,而是调用 RLock 的内部接口,一次性释放。 从 python3.2 开始,这个方法总是返回 None。
wait_for(predicate, timeout=None)
等待,直到条件计算为真或超时。 predicate 应该是一个可调用对象而且它的返回值可被解释为一个布尔值。 与 wait 方法一样,wait_for 方法也支持传入一个 timeout 参数。 其实现方法大致相当于:
while not predicate():
cv.wait()
notify(n=1)
唤醒等待在这个条件对象的线程,传入参数为唤醒线程数量,默认为 1。 如果调用线程在没有获得锁的情况下调用这个方法,会引发 RuntimeError 异常。 需要注意的是,被唤醒的线程实际上不会返回它调用的 wait() ,直到它可以重新获得锁,而 notify 方法并不会释放锁。
notify_all()
唤醒所有正在等待这个条件对象的线程。 相当于:
cv.notify(threading.active_count())
条件对象也同样支持 python 上下文管理协议,下面我们通过条件对象及上下文管理协议实现我们开始时所设想的对订单状态的监听程序:
import logging
import random
from threading import Thread, Condition
class Queue():
def __init__(self):
self.runtimes = 0
self.front = -1
self.rear = -1
self.queue = []
def enqueue(self, ele): # 入队操作
self.queue.append(ele)
self.rear = self.rear + 1
def dequeue(self): # 出队操作
if self.isempty():
return None
node = self.queue.pop(0)
self.front = self.front + 1
return node
def isempty(self):
return self.front == self.rear
class CareStatusThread(Thread):
def __init__(self, carestatus, conobj, queue, threshold):
super().__init__()
self.orderids = []
self.conobj = conobj
self.queue = queue
self.notifystatus = carestatus
self.threshold = threshold
def run(self):
while True:
with self.conobj:
logging.info('%r start running' % self)
if self.queue.isempty():
logging.info('%r queue is empty' % self)
self.conobj.wait()
firstorder = None
orderids = list()
while True:
order = self.queue.dequeue()
if order is None or order == firstorder:
break
if order['status'] != self.notifystatus:
if firstorder is None:
firstorder = order
self.queue.enqueue(order)
continue
orderids.append(order['id'])
if len(orderids) > 0:
self.orderids.extend(orderids)
logging.info('%r orders%s add in list' % (self, orderids))
logging.info('%r run over' % self)
if self.queue.runtimes == self.threshold:
return
self.conobj.wait()
def __repr__(self):
return 'CareStatusThread(%s)' % self.notifystatus
class ProducerThread(Thread):
def __init__(self, orderlist, conobj, queue, threshold):
super().__init__()
self.orderlist = orderlist
self.conobj = conobj
self.queue = queue
self.threshold = threshold
def run(self):
for _ in range(self.threshold):
with self.conobj:
times = int(random.uniform(1, 5))
for _ in range(times):
index = int(random.uniform(0, len(self.orderlist)))
order = self.orderlist[index]
fromstatus = order['status']
order['status'] += 1
self.queue.enqueue(order)
logging.info('%r change order %s from %s to %s' % (self, order['id'], fromstatus, order['status']))
self.queue.runtimes += 1
logging.info('%r run over' % self)
self.conobj.notify_all()
def __repr__(self):
return 'ProducerThread'
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
conobj = Condition()
queue = Queue()
orderid = 10001
threshold = 10
orderlist = [{'id': orderid + i, 'status': 0} for i in range(50)]
producer = ProducerThread(orderlist, conobj, queue, threshold)
afterpay = CareStatusThread(1, conobj, queue, threshold)
afterrefund = CareStatusThread(2, conobj, queue, threshold)
afterpay.start()
afterrefund.start()
producer.start()
producer.join()
afterpay.join()
afterrefund.join()
logging.info('%r orderids: %s' % (afterpay, afterpay.orderids))
logging.info('%r orderids: %s' % (afterrefund, afterrefund.orderids))
打印出了:
2019-05-11 10:01:43,340 - INFO: CareStatusThread(1) start running 2019-05-11 10:01:43,342 - INFO: CareStatusThread(1) queue is empty 2019-05-11 10:01:43,343 - INFO: CareStatusThread(2) start running 2019-05-11 10:01:43,344 - INFO: CareStatusThread(2) queue is empty 2019-05-11 10:01:43,346 - INFO: ProducerThread change order 10020 from 0 to 1 2019-05-11 10:01:43,347 - INFO: ProducerThread change order 10041 from 0 to 1 2019-05-11 10:01:43,348 - INFO: ProducerThread run over 2019-05-11 10:01:43,348 - INFO: ProducerThread change order 10029 from 0 to 1 2019-05-11 10:01:43,349 - INFO: ProducerThread run over 2019-05-11 10:01:43,350 - INFO: CareStatusThread(1) orders[10020, 10041, 10029] add in list 2019-05-11 10:01:43,350 - INFO: CareStatusThread(1) run over 2019-05-11 10:01:43,351 - INFO: ProducerThread change order 10010 from 0 to 1 2019-05-11 10:01:43,351 - INFO: ProducerThread change order 10020 from 1 to 2 2019-05-11 10:01:43,352 - INFO: ProducerThread run over 2019-05-11 10:01:43,352 - INFO: ProducerThread change order 10003 from 0 to 1 2019-05-11 10:01:43,353 - INFO: ProducerThread run over 2019-05-11 10:01:43,353 - INFO: ProducerThread change order 10005 from 0 to 1 2019-05-11 10:01:43,354 - INFO: ProducerThread change order 10010 from 1 to 2 2019-05-11 10:01:43,354 - INFO: ProducerThread change order 10032 from 0 to 1 2019-05-11 10:01:43,354 - INFO: ProducerThread run over 2019-05-11 10:01:43,355 - INFO: ProducerThread change order 10025 from 0 to 1 2019-05-11 10:01:43,355 - INFO: ProducerThread change order 10034 from 0 to 1 2019-05-11 10:01:43,356 - INFO: ProducerThread change order 10036 from 0 to 1 2019-05-11 10:01:43,356 - INFO: ProducerThread change order 10033 from 0 to 1 2019-05-11 10:01:43,357 - INFO: ProducerThread run over 2019-05-11 10:01:43,357 - INFO: ProducerThread change order 10011 from 0 to 1 2019-05-11 10:01:43,357 - INFO: ProducerThread change order 10012 from 0 to 1 2019-05-11 10:01:43,358 - INFO: ProducerThread run over 2019-05-11 10:01:43,358 - INFO: ProducerThread change order 10045 from 0 to 1 2019-05-11 10:01:43,359 - INFO: ProducerThread change order 10036 from 1 to 2 2019-05-11 10:01:43,359 - INFO: ProducerThread run over 2019-05-11 10:01:43,360 - INFO: ProducerThread change order 10035 from 0 to 1 2019-05-11 10:01:43,360 - INFO: ProducerThread change order 10013 from 0 to 1 2019-05-11 10:01:43,361 - INFO: ProducerThread change order 10014 from 0 to 1 2019-05-11 10:01:43,361 - INFO: ProducerThread change order 10039 from 0 to 1 2019-05-11 10:01:43,361 - INFO: ProducerThread run over 2019-05-11 10:01:43,362 - INFO: ProducerThread change order 10039 from 1 to 2 2019-05-11 10:01:43,362 - INFO: ProducerThread run over 2019-05-11 10:01:43,363 - INFO: CareStatusThread(2) orders[10010, 10020, 10010, 10036, 10036, 10039, 10039] add in list 2019-05-11 10:01:43,364 - INFO: CareStatusThread(2) run over 2019-05-11 10:01:43,364 - INFO: CareStatusThread(1) start running 2019-05-11 10:01:43,365 - INFO: CareStatusThread(1) orders[10005, 10032, 10025, 10034, 10033, 10011, 10012, 10045, 10035, 10013, 10014] add in list 2019-05-11 10:01:43,365 - INFO: CareStatusThread(1) run over 2019-05-11 10:01:43,366 - INFO: CareStatusThread(1) orderids: [10020, 10041, 10029, 10005, 10032, 10025, 10034, 10033, 10011, 10012, 10045, 10035, 10013, 10014] 2019-05-11 10:01:43,366 - INFO: CareStatusThread(2) orderids: [10010, 10020, 10010, 10036, 10036, 10039, 10039]
上面代码中,我们创建了三个线程:
我们看到 ProducerThread 执行变更状态后,通过 notify_all 唤醒状态监听线程后解锁,而 CareStatusThread 则紧接着执行相应的业务逻辑消费队列。 这是一个典型的生产者-消费者模型,最终我们看到两个消费者线程分别收集到了自己所关心的一系列订单id。
https://docs.python.org/zh-cn/3.6/library/threading.html。