我们已经介绍了 python 的几种线程同步工具。 Python 线程同步(一) — 竞争条件与线程锁 python 线程同步(二) — 条件对象 python 线程同步(三) — 信号量
本文介绍的线程同步工具相比上面已经介绍过的三类工具来说,更加简单实用。
事件的使用是线程间通信的最简单机制之一 — 一个线程发出事件信号,另一个线程等待并响应该信号。 python threading 包中提供的事件对象 Event 就是用来做这件事的。 当事件对象中的标志位由 True 变为 False,所有等待在该事件上的线程都将被唤醒。 因此,python 中的事件对象 Event 提供了以下方法供调用:
is_set()
返回事件标志是否为 True。
set()
将事件内部标志位设置为 True,接着唤醒所有等待在该事件上的线程。
clear()
清除标志,将事件标志重置为 False,此后若干个线程又可以重新阻塞在该事件对象上。
wait(timeout=None)
阻塞线程直到内部变量为true。如果调用时内部标志为true,将立即返回。否则将阻塞线程,直到调用 set() 方法将标志设置为true或者发生可选的超时。 如果是因为超时返回,则会返回 False,否则会返回 True。
下面的例子展示了所有5个线程均阻塞在一个事件对象上,直到3秒后,主线程调用 set 方法触发事件信号,可以看到所有 5 个线程均立即开始执行。
import logging
from threading import Thread, Event
from time import sleep
class EventThread(Thread):
def __init__(self, event, id):
super().__init__()
self._event = event
self._id = id
def run(self):
logging.info('%r start running' % self)
self._event.wait()
logging.info('%r continue running after event' % self)
def __repr__(self):
return 'EventThread(%s)' % self._id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
event = Event()
for i in range(5):
thread = EventThread(event, i)
thread.start()
logging.info('main start sleeping')
sleep(3)
logging.info('main set event')
event.set()
打印出了:
2019-05-14 09:15:50,626 - INFO: EventThread(0) start running 2019-05-14 09:15:50,626 - INFO: EventThread(1) start running 2019-05-14 09:15:50,626 - INFO: EventThread(2) start running 2019-05-14 09:15:50,626 - INFO: EventThread(3) start running 2019-05-14 09:15:50,626 - INFO: EventThread(4) start running 2019-05-14 09:15:50,626 - INFO: main start sleeping 2019-05-14 09:15:53,639 - INFO: main set event 2019-05-14 09:15:53,645 - INFO: EventThread(1) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(0) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(2) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(4) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(3) continue running after event
栅栏类是另一个简单的同步原语,此前我们已经介绍过 Linux 与 Java 中的栅栏。 java 线程同步工具类
栅栏对象用于让多个线程互相等待。 他维护了一个内部的计数器,值由构造方法默认传入,每当有一个线程调用 wait 方法,则该值原子地减 1,直到减到 0,则让所有阻塞 wait 在该栅栏对象上的线程继续执行。
Barrier(parties, action=None, timeout=None)
wait(timeout=None)
栅栏对象中最重要的方法就是 wait 方法了。 线程阻塞等待,直到构造方法传入的 parties 个线程均阻塞等待在 wait 方法或超时,如果该方法传入的超时时间为 None,则使用构造方法传入的默认超时。 一旦超时发生,栅栏将立即进入破损状态,此时其他仍阻塞等待该栅栏的线程将收到 wait 方法抛出的 BrokenBarrierError 异常。 如果试图在已破损的栅栏对象上调用 wait 方法,也会立即抛出 BrokenBarrierError 异常。 返回一个数字,值为 0 到 parties - 1,解释器保证了所有等待在同一个栅栏上的线程中,每一个的返回值都不同,以便让你可以依赖 wait 方法的返回值来做一些处理。 如果创建栅栏对象时在构造函数中提供了 action 参数,它将在其中一个线程释放前被调用。如果此调用引发了异常,栅栏对象将进入破损状态。
reset()
重置栅栏为默认的初始态。 如果栅栏中仍有线程等待释放,这些线程将会收到 BrokenBarrierError 异常。 除非非常必要,否则并不建议使用该方法,很多时候与其重用一个状态未知的栅栏,不如新建一个。
abort()
使栅栏进入破损态。 这将导致所有已经调用和未来调用的 wait() 方法中引发 BrokenBarrierError 异常。
栅栏的使用虽然简单,但却十分实用,在实际环境中,我们通常需要并发调用很多业务方的接口,并收集他们的返回,然后在所有接口均返回后再进行下一步处理。 但并不是所有接口的调用都是必须的,因此对于该场景,一个必要的优化方式是一旦收集到必要接口的返回,立即中断其他接口的调用,并开始这之后的操作。 上述需求如果使用栅栏来解决会显得非常简单而优雅,虽然 Python 中我们并不能在线程外终止线程,但我们可以通过栅栏的 abort 方法让那些尚未执行结束的线程一旦执行结束即抛出异常,从而让我们不需要去关注他们。 下面的例子模拟了上面描述的过程。
import logging
import random
from threading import Thread, Barrier
from time import sleep, time
class InterfaceThread(Thread):
def __init__(self, majorbarrier, minorbarrier, id, major):
super().__init__()
self._majorbarrier = majorbarrier
self._minorbarrier = minorbarrier
self._id = id
self._major = major
def run(self):
nsec = random.uniform(0, 4)
logging.info('%r start running sleep %s' % (self, nsec))
sleep(nsec)
logging.info('%r after sleeping' % self)
if self._major:
try:
result = self._majorbarrier.wait()
if result == 0:
self._minorbarrier.abort()
except:
logging.error('%s waitting on majorbarrier aborted' % self)
return
else:
try:
self._minorbarrier.wait()
except:
logging.warning('%s watting on minorbarrier aborted' % self)
return
logging.info('%r continue running after barrier' % self)
def __repr__(self):
return 'InterfaceThread(%s【major: %s】)' % (self._id, self._major)
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')
start = time()
majorbarrier = Barrier(4)
minorbarrier = Barrier(3)
threads = list()
for i in range(6):
threads.append(InterfaceThread(majorbarrier, minorbarrier, i, bool(i >= 3)))
for thread in threads:
thread.start()
result = majorbarrier.wait()
if result == 0:
minorbarrier.abort()
logging.info('run by %s' % (time() - start))
上面的例子中创建了两个栅栏对象,分别用来同步必要接口调用与非必要接口调用,我们通过随机 sleep 0 到 4 秒来模拟接口调用。 一旦必要栅栏的 wait 方法返回 0,则意味着必要接口已全部返回,此时可以通过调用非必要栅栏的 abort 方法来破坏非必要栅栏,同时程序继续执行,从而实现整体运行时间的最大限度缩短。
打印出了:
2019-05-14 14:00:05,045 - INFO: InterfaceThread(0【major: False】) start running sleep 1.3645551759667334 2019-05-14 14:00:05,050 - INFO: InterfaceThread(1【major: False】) start running sleep 3.5451267021153607 2019-05-14 14:00:05,050 - INFO: InterfaceThread(2【major: False】) start running sleep 3.0433784558963644 2019-05-14 14:00:05,052 - INFO: InterfaceThread(3【major: True】) start running sleep 2.0092681547999875 2019-05-14 14:00:05,053 - INFO: InterfaceThread(4【major: True】) start running sleep 2.266415383907653 2019-05-14 14:00:05,053 - INFO: InterfaceThread(5【major: True】) start running sleep 0.6692143957122372 2019-05-14 14:00:05,728 - INFO: InterfaceThread(5【major: True】) after sleeping 2019-05-14 14:00:06,416 - INFO: InterfaceThread(0【major: False】) after sleeping 2019-05-14 14:00:07,077 - INFO: InterfaceThread(3【major: True】) after sleeping 2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) after sleeping 2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - INFO: run by 2.284111976623535 2019-05-14 14:00:07,329 - INFO: InterfaceThread(5【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - INFO: InterfaceThread(3【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - WARNING: InterfaceThread(0【major: False】) watting on minorbarrier aborted 2019-05-14 14:00:08,109 - INFO: InterfaceThread(2【major: False】) after sleeping 2019-05-14 14:00:08,110 - WARNING: InterfaceThread(2【major: False】) watting on minorbarrier aborted 2019-05-14 14:00:08,613 - INFO: InterfaceThread(1【major: False】) after sleeping 2019-05-14 14:00:08,613 - WARNING: InterfaceThread(1【major: False】) watting on minorbarrier aborted
可以看到,并发调用六个线程,按照 sleep 时间,应该在 3.5451267 秒以上。 而实际上,由于重要线程均以完成,主线程只用 2.284111976623535 秒便已返回。 这样,我们就实现了接口性能的大幅提升,但线程 1、2 由于 sleep 时间过长,没有能够在主线程返回前返回。