前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python 线程同步(三) -- 信号量

python 线程同步(三) -- 信号量

作者头像
用户3147702
发布2022-06-27 13:30:40
1.8K0
发布2022-06-27 13:30:40
举报
文章被收录于专栏:小脑斧科技博客

1. 引言

上两篇文章中,我们详细介绍了 Python 中的两种线程同步方式 — 锁与条件对象。 Python 线程同步(一) — 竞争条件与线程锁 python 线程同步(二) — 条件对象

本文我们来介绍一个计算机科学史上最为古老和经典的线程同步原语之一 — 信号量。

2. 信号量

我们此前已经介绍过 Linux 的信号量与 Java 中的信号量。

信号量是操作系统中的一个经典线程同步原语,实际上他是带有计数功能的互斥锁,用来保护某个只允许指定数量操作的资源。 信号量与锁机制非常类似,但他维护了一个内部的计数值,每次加锁原子性的将计数值减1,返回不为负则意味着加锁成功,否则加回1并阻塞等待直到被唤醒,而解锁时则在信号量计数上进行加1操作。 一般来说,对计数值的修改是通过 CAS 操作实现的。 CAS 思想与 java 原子操作的实现

3. Python 中的信号量 — threading.Semaphore

python 标准库中的 threading 包中实现了信号量对象。

3.1. 构造方法

该对象的构造方法有一个参数 value 用于初始化上文所述的信号量内的计数值,默认为 1。

threading.Semaphore(value=1)

3.1.1. value 的取值

  • 当 value 传入大于 1,这是最为常用的用法,用来限制最多 value 个线程可以同时共享资源
  • 当 value 传入为 1 时,信号量退化为了一个普通的线程锁,虽然这是默认行为,但与 threading 中提供的锁对象相比,通过信号量实现基本的线程锁虽然在使用方式上是一样的,但其执行效率要低一些,因此不建议这样使用
  • 当 value 传入 0 时,所有试图加锁的线程都将阻塞在该信号量对象上,但 Python 允许不经加锁直接调用解锁方法来增加计数值,但这通常是错误的用法,应该避免这样使用
  • 当 value 传入小于 0 时,会抛出 ValueError 异常

3.2. 加锁

acquire(blocking=True, timeout=None)

加锁方法的执行逻辑我们已经在上面有过详细介绍。 Python 信号量的加锁方法允许传入两个参数,分别表示是否阻塞,与最长等待时间(秒数) 加锁成功则返回 True。

3.3. 解锁

release()

解锁方法就是将信号量中的计数器加 1,如果计数器的原值为 0,则唤醒所有阻塞在该信号量上的线程。 与普通的锁对象不同,Python 中的信号量允许在未加锁的情况下调用 release 方法来让计数器加 1。

代码语言:javascript
复制
import logging
from threading import Thread, Semaphore

class SemaphoreTestThread(Thread):
    def __init__(self, id, semaphore):
        super().__init__()
        self.id = id
        self.semaphore = semaphore

    def run(self) -> None:
        logging.info('%r start running' % self)
        try:
            while self.semaphore.acquire():
                logging.info('%r hold the semaphore' % self)
        finally:
            self.semaphore.release()

    def __repr__(self):
        return 'SemaphoreTestThread(%s)' % self.id

if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

    semaphore = Semaphore(0)
    for i in range(10):
        thread = SemaphoreTestThread(i, semaphore)
        thread.start()

    logging.info('all the threads are running')
    for i in range(5):
        logging.info('add 1 on the semaphore')
        semaphore.release()

打印出了:

2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(0) start running 2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(1) start running 2019-05-12 22:12:24,000 - INFO: SemaphoreTestThread(2) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(4) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(5) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(6) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(7) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(8) start running 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(9) start running 2019-05-12 22:12:24,016 - INFO: all the threads are running 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(0) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(1) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(2) hold the semaphore 2019-05-12 22:12:24,016 - INFO: add 1 on the semaphore 2019-05-12 22:12:24,016 - INFO: SemaphoreTestThread(3) hold the semaphore

可以看到,我们创建了 10 个线程并启动,但由于信号量的初始计数为 0,因此所有 10 个线程在启动后均立即阻塞等待在信号量上。 我们的主线程在未对信号量加锁的情况下直接调用了 release 方法,这并没有报错,而是激活了 10 个线程中的某个线程运行。

4. 有界信号量 — BoundedSemaphore

上面的例子中,我们看到,Python 中的信号量允许我们在未加锁的情况下直接调用解锁方法来让信号量内计数器值加 1,这似乎让构造方法传入的 value 值失去了他的价值。 Python 中存在另一种信号量,他与我们上面讲解的信号量仅有一点区别,那就是当 release 方法试图将计数器增加到大于构造方法传入的 value 值时,会抛出 ValueError 异常。 因此,在通常使用中 Semaphore 与 BoundedSemaphore 并没有什么区别。

我们把上文未经加锁即解锁例子中的信号量改为 BoundedSemaphore 再来试一下:

代码语言:javascript
复制
import logging
from threading import Thread, BoundedSemaphore

class SemaphoreTestThread(Thread):
    def __init__(self, id, semaphore):
        super().__init__()
        self.id = id
        self.semaphore = semaphore

    def run(self) -> None:
        logging.info('%r start running' % self)
        try:
            while self.semaphore.acquire():
                logging.info('%r hold the semaphore' % self)
        finally:
            self.semaphore.release()

    def __repr__(self):
        return 'SemaphoreTestThread(%s)' % self.id

if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

    semaphore = BoundedSemaphore(0)
    for i in range(10):
        thread = SemaphoreTestThread(i, semaphore)
        thread.start()

    logging.info('all the threads are running')
    for i in range(5):
        logging.info('add 1 on the semaphore')
        semaphore.release()

打印出了:

2019-05-13 00:08:35,020 - INFO: SemaphoreTestThread(0) start running 2019-05-13 00:08:35,024 - INFO: SemaphoreTestThread(1) start running 2019-05-13 00:08:35,025 - INFO: SemaphoreTestThread(2) start running 2019-05-13 00:08:35,027 - INFO: SemaphoreTestThread(3) start running 2019-05-13 00:08:35,028 - INFO: SemaphoreTestThread(4) start running 2019-05-13 00:08:35,034 - INFO: SemaphoreTestThread(5) start running 2019-05-13 00:08:35,039 - INFO: SemaphoreTestThread(6) start running 2019-05-13 00:08:35,043 - INFO: SemaphoreTestThread(7) start running 2019-05-13 00:08:35,053 - INFO: SemaphoreTestThread(8) start running 2019-05-13 00:08:35,060 - INFO: all the threads are running 2019-05-13 00:08:35,060 - INFO: add 1 on the semaphore Traceback (most recent call last): File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1741, in <module> 2019-05-13 00:08:35,054 - INFO: SemaphoreTestThread(9) start running main() File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1735, in main globals = debugger.run(setup[‘file’], None, None, is_module) File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\pydevd.py", line 1135, in run pydev_imports.execfile(file, globals, locals) # execute the script File "C:\Program Files\JetBrains\PyCharm 2019.1\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile exec(compile(contents+"\n", file, ’exec’), glob, loc) File "D:/Workspace/code/python/fluentpython/thread/semaphore.py", line 34, in <module> semaphore.release() File "C:\Users\zeyu\Anaconda3\lib\threading.py", line 483, in release raise ValueError("Semaphore released too many times") ValueError: Semaphore released too many times

5. 示例 — 一个简易的 DB 连接池

代码语言:javascript
复制
from threading import BoundedSemaphore, Semaphore

class PooledDB:
    def __init__(self, creator, minconnections, maxconnections, *args, **kwargs):
        self._args, self._kwargs = args, kwargs
        self._creator = creator
        self._minconnections = minconnections
        self._maxconnections = maxconnections
        self._max_semaphore = Semaphore(maxconnections)
        self._min_semaphore = BoundedSemaphore(minconnections)
        self._idle_cache = []
        idle = [self.get_connection() for _ in range(minconnections)]
        while idle:
            idle.pop().close()

    def get_connection(self, timeout=None):
        hold = self._max_semaphore.acquire(timeout=timeout)
        if hold:
            hold = self._min_semaphore.acquire(blocking=False)
            if hold:
                return self._idle_cache.pop(0)
            else:
                return PooledConnection(self._creator, self, args=self._args, kwargs=self._kwargs)
        return None

    def returnConnect(self, connection):
        try:
            self._min_semaphore.release()
            self._idle_cache.append(connection)
        except ValueError:
            connection.close(True)
        finally:
            self._max_semaphore.release()

class PooledConnection:
    def __init__(self, creator, pool, *args, **kwargs):
        self._pool = pool
        self._creator = creator
        self._con = self._creator.connect(args, kwargs)

    def close(self, force_close=False):
        if force_close:
            self._con.close()
        else:
            self._pool.returnConnect(self)

这只是一个用于示例的简易 DB 连接池实现,同时,对于连接类 PooledConnection 我们省略了 begin、commit、rollback、cursor、ping 等方法的实现,因为这些与我们本节的内容并没什么关系,只实现了连接创建方法与 close 方法,同时省略了所有的参数、边界判断,只为了让示例更加精简,很快我会写一篇详细介绍生产环境可用的 DB 连接池的源码解析,敬请期待。 上面的例子中,我们的连接池构造方法拥有两个参数 — 最大连接数和最小连接数。 我们创建了两个 BoundedSemaphore 对象,分别用来限制并发环境中的最大、最小连接数。

5.1. 创建连接

初始状态下我们就已经向空闲队列中添加了最小连接数个数个空闲连接,我们看到,在创建连接时,我们先试图对最大连接数信号量进行加锁,从而保证并发环境下连接池连接数不会超过 maxconnections 值。 然后,对最小连接数信号量进行了加锁,加锁成功则从空闲队列中获取连接,否则新建连接。

5.2. 关闭连接

关闭连接时,我们首先试图释放最小连接数信号量,这里就体现出了 BoundedSemaphore 的价值,一旦释放次数超过构造参数传入的 minconnections 则意味着我们的释放次数大于了加锁次数,也就是说,这个被释放连接并不是从空闲队列 _idle_cache 中取出的,而 BoundedSemaphore 在此时抛出 ValueError 异常让我们可以直接强制关闭该连接,而不是让他回到连接池。 与最小连接数信号量相比,最大连接数信号量使用 Semaphore 就可以了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-05-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 引言
  • 2. 信号量
  • 3. Python 中的信号量 — threading.Semaphore
    • 3.1. 构造方法
      • 3.1.1. value 的取值
    • 3.2. 加锁
      • 3.3. 解锁
      • 4. 有界信号量 — BoundedSemaphore
      • 5. 示例 — 一个简易的 DB 连接池
        • 5.1. 创建连接
          • 5.2. 关闭连接
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档