上两篇文章中,我们详细介绍了 Python 中的两种线程同步方式 — 锁与条件对象。 Python 线程同步(一) — 竞争条件与线程锁 python 线程同步(二) — 条件对象
本文我们来介绍一个计算机科学史上最为古老和经典的线程同步原语之一 — 信号量。
我们此前已经介绍过 Linux 的信号量与 Java 中的信号量。
信号量是操作系统中的一个经典线程同步原语,实际上他是带有计数功能的互斥锁,用来保护某个只允许指定数量操作的资源。 信号量与锁机制非常类似,但他维护了一个内部的计数值,每次加锁原子性的将计数值减1,返回不为负则意味着加锁成功,否则加回1并阻塞等待直到被唤醒,而解锁时则在信号量计数上进行加1操作。 一般来说,对计数值的修改是通过 CAS 操作实现的。 CAS 思想与 java 原子操作的实现
python 标准库中的 threading 包中实现了信号量对象。
该对象的构造方法有一个参数 value 用于初始化上文所述的信号量内的计数值,默认为 1。
threading.Semaphore(value=1)
acquire(blocking=True, timeout=None)
加锁方法的执行逻辑我们已经在上面有过详细介绍。 Python 信号量的加锁方法允许传入两个参数,分别表示是否阻塞,与最长等待时间(秒数) 加锁成功则返回 True。
release()
解锁方法就是将信号量中的计数器加 1,如果计数器的原值为 0,则唤醒所有阻塞在该信号量上的线程。 与普通的锁对象不同,Python 中的信号量允许在未加锁的情况下调用 release 方法来让计数器加 1。
import logging
from threading import Thread, Semaphore
class SemaphoreTestThread(Thread):
def __init__(self, id, semaphore):
super().__init__()
self.id = id
self.semaphore = semaphore
def run(self) -> None:
logging.info('%r start running' % self)
try:
while self.semaphore.acquire():
logging.info('%r hold the semaphore' % self)
finally:
self.semaphore.release()
def __repr__(self):
return 'SemaphoreTestThread(%s)' % self.id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
semaphore = Semaphore(0)
for i in range(10):
thread = SemaphoreTestThread(i, semaphore)
thread.start()
logging.info('all the threads are running')
for i in range(5):
logging.info('add 1 on the semaphore')
semaphore.release()
打印出了:
2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(0) start running 2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(1) start running 2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(2) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(4) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(5) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(6) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(7) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(8) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(9) start running 2019-05-12 22:12:24,016 - INFO: all the threads are running 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(0) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(1) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) hold the semaphore
可以看到,我们创建了 10 个线程并启动,但由于信号量的初始计数为 0,因此所有 10 个线程在启动后均立即阻塞等待在信号量上。 我们的主线程在未对信号量加锁的情况下直接调用了 release 方法,这并没有报错,而是激活了 10 个线程中的某个线程运行。
上面的例子中,我们看到,Python 中的信号量允许我们在未加锁的情况下直接调用解锁方法来让信号量内计数器值加 1,这似乎让构造方法传入的 value 值失去了他的价值。 Python 中存在另一种信号量,他与我们上面讲解的信号量仅有一点区别,那就是当 release 方法试图将计数器增加到大于构造方法传入的 value 值时,会抛出 ValueError 异常。 因此,在通常使用中 Semaphore 与 BoundedSemaphore 并没有什么区别。
我们把上文未经加锁即解锁例子中的信号量改为 BoundedSemaphore 再来试一下:
import logging
from threading import Thread, BoundedSemaphore
class SemaphoreTestThread(Thread):
def __init__(self, id, semaphore):
super().__init__()
self.id = id
self.semaphore = semaphore
def run(self) -> None:
logging.info('%r start running' % self)
try:
while self.semaphore.acquire():
logging.info('%r hold the semaphore' % self)
finally:
self.semaphore.release()
def __repr__(self):
return 'SemaphoreTestThread(%s)' % self.id
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
semaphore = BoundedSemaphore(0)
for i in range(10):
thread = SemaphoreTestThread(i, semaphore)
thread.start()
logging.info('all the threads are running')
for i in range(5):
logging.info('add 1 on the semaphore')
semaphore.release()
打印出了:
2019-05-13 00:08:35,020 - INFO: SemaphoreTestThread(0) start running 2019-05-13 00:08:35,024 - INFO: SemaphoreTestThread(1) start running 2019-05-13 00:08:35,025 - INFO: SemaphoreTestThread(2) start running 2019-05-13 00:08:35,027 - INFO: SemaphoreTestThread(3) start running 2019-05-13 00:08:35,028 - INFO: SemaphoreTestThread(4) start running 2019-05-13 00:08:35,034 - INFO: SemaphoreTestThread(5) start running 2019-05-13 00:08:35,039 - INFO: SemaphoreTestThread(6) start running 2019-05-13 00:08:35,043 - INFO: SemaphoreTestThread(7) start running 2019-05-13 00:08:35,053 - INFO: SemaphoreTestThread(8) start running 2019-05-13 00:08:35,060 - INFO: all the threads are running 2019-05-13 00:08:35,060 - INFO: add 1 on the semaphore Traceback (most recent call last): File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1741, in <module> 2019-05-13 00:08:35,054 - INFO: SemaphoreTestThread(9) start running main() File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1735, in main globals = debugger.run(setup[‘file’], None, None, is_module) File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1135, in run pydev_imports.execfile(file, globals, locals) # execute the script File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile exec(compile(contents+"\n", file, ’exec’), glob, loc) File "D:/Workspace/code/python/fluentpython/thread/semaphore.py", line 34, in <module> semaphore.release() File "C:\Users\zeyu\Anaconda3\lib\threading.py", line 483, in release raise ValueError("Semaphore released too many times") ValueError: Semaphore released too many times
from threading import BoundedSemaphore, Semaphore
class PooledDB:
def __init__(self, creator, minconnections, maxconnections, *args, **kwargs):
self._args, self._kwargs = args, kwargs
self._creator = creator
self._minconnections = minconnections
self._maxconnections = maxconnections
self._max_semaphore = Semaphore(maxconnections)
self._min_semaphore = BoundedSemaphore(minconnections)
self._idle_cache = []
idle = [self.get_connection() for _ in range(minconnections)]
while idle:
idle.pop().close()
def get_connection(self, timeout=None):
hold = self._max_semaphore.acquire(timeout=timeout)
if hold:
hold = self._min_semaphore.acquire(blocking=False)
if hold:
return self._idle_cache.pop(0)
else:
return PooledConnection(self._creator, self, args=self._args, kwargs=self._kwargs)
return None
def returnConnect(self, connection):
try:
self._min_semaphore.release()
self._idle_cache.append(connection)
except ValueError:
connection.close(True)
finally:
self._max_semaphore.release()
class PooledConnection:
def __init__(self, creator, pool, *args, **kwargs):
self._pool = pool
self._creator = creator
self._con = self._creator.connect(args, kwargs)
def close(self, force_close=False):
if force_close:
self._con.close()
else:
self._pool.returnConnect(self)
这只是一个用于示例的简易 DB 连接池实现,同时,对于连接类 PooledConnection 我们省略了 begin、commit、rollback、cursor、ping 等方法的实现,因为这些与我们本节的内容并没什么关系,只实现了连接创建方法与 close 方法,同时省略了所有的参数、边界判断,只为了让示例更加精简,很快我会写一篇详细介绍生产环境可用的 DB 连接池的源码解析,敬请期待。 上面的例子中,我们的连接池构造方法拥有两个参数 — 最大连接数和最小连接数。 我们创建了两个 BoundedSemaphore 对象,分别用来限制并发环境中的最大、最小连接数。
初始状态下我们就已经向空闲队列中添加了最小连接数个数个空闲连接,我们看到,在创建连接时,我们先试图对最大连接数信号量进行加锁,从而保证并发环境下连接池连接数不会超过 maxconnections 值。 然后,对最小连接数信号量进行了加锁,加锁成功则从空闲队列中获取连接,否则新建连接。
关闭连接时,我们首先试图释放最小连接数信号量,这里就体现出了 BoundedSemaphore 的价值,一旦释放次数超过构造参数传入的 minconnections 则意味着我们的释放次数大于了加锁次数,也就是说,这个被释放连接并不是从空闲队列 _idle_cache 中取出的,而 BoundedSemaphore 在此时抛出 ValueError 异常让我们可以直接强制关闭该连接,而不是让他回到连接池。 与最小连接数信号量相比,最大连接数信号量使用 Semaphore 就可以了。