前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python 线程同步(四) -- 事件对象与栅栏

python 线程同步(四) -- 事件对象与栅栏

作者头像
用户3147702
发布2022-06-27 13:31:07
3820
发布2022-06-27 13:31:07
举报
文章被收录于专栏:小脑斧科技博客

1. 引言

我们已经介绍了 python 的几种线程同步工具。 Python 线程同步(一) — 竞争条件与线程锁 python 线程同步(二) — 条件对象 python 线程同步(三) — 信号量

本文介绍的线程同步工具相比上面已经介绍过的三类工具来说,更加简单实用。

2. 事件对象 — Event

事件的使用是线程间通信的最简单机制之一 — 一个线程发出事件信号,另一个线程等待并响应该信号。 python threading 包中提供的事件对象 Event 就是用来做这件事的。 当事件对象中的标志位由 True 变为 False,所有等待在该事件上的线程都将被唤醒。 因此,python 中的事件对象 Event 提供了以下方法供调用:

2.1. is_set

is_set()

返回事件标志是否为 True。

2.2. set

set()

将事件内部标志位设置为 True,接着唤醒所有等待在该事件上的线程。

2.3. clear

clear()

清除标志,将事件标志重置为 False,此后若干个线程又可以重新阻塞在该事件对象上。

2.4. wait

wait(timeout=None)

阻塞线程直到内部变量为true。如果调用时内部标志为true,将立即返回。否则将阻塞线程,直到调用 set() 方法将标志设置为true或者发生可选的超时。 如果是因为超时返回,则会返回 False,否则会返回 True。

2.5. 示例

下面的例子展示了所有5个线程均阻塞在一个事件对象上,直到3秒后,主线程调用 set 方法触发事件信号,可以看到所有 5 个线程均立即开始执行。

代码语言:javascript
复制
import logging
from threading import Thread, Event
from time import sleep

class EventThread(Thread):
    def __init__(self, event, id):
        super().__init__()
        self._event = event
        self._id = id

    def run(self):
        logging.info('%r start running' % self)
        self._event.wait()
        logging.info('%r continue running after event' % self)

    def __repr__(self):
        return 'EventThread(%s)' % self._id

if __name__ == '__main__':
    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

    event = Event()
    for i in range(5):
        thread = EventThread(event, i)
        thread.start()

    logging.info('main start sleeping')
    sleep(3)

    logging.info('main set event')
    event.set()

打印出了:

2019-05-14 09:15:50,626 - INFO: EventThread(0) start running 2019-05-14 09:15:50,626 - INFO: EventThread(1) start running 2019-05-14 09:15:50,626 - INFO: EventThread(2) start running 2019-05-14 09:15:50,626 - INFO: EventThread(3) start running 2019-05-14 09:15:50,626 - INFO: EventThread(4) start running 2019-05-14 09:15:50,626 - INFO: main start sleeping 2019-05-14 09:15:53,639 - INFO: main set event 2019-05-14 09:15:53,645 - INFO: EventThread(1) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(0) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(2) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(4) continue running after event 2019-05-14 09:15:53,645 - INFO: EventThread(3) continue running after event

3. 栅栏对象 — Barrier

栅栏类是另一个简单的同步原语,此前我们已经介绍过 Linux 与 Java 中的栅栏。 java 线程同步工具类

栅栏对象用于让多个线程互相等待。 他维护了一个内部的计数器,值由构造方法默认传入,每当有一个线程调用 wait 方法,则该值原子地减 1,直到减到 0,则让所有阻塞 wait 在该栅栏对象上的线程继续执行。

3.1. 构造方法

Barrier(parties, action=None, timeout=None)

  • 构造方法必须传入一个值,就是我们上文所说的计数初始值
  • 如果提供了可调用的 action 参数,它会在所有线程被释放时在其中一个线程中自动调用 action 方法
  • timeout 是默认的超时时间,如果没有在 wait() 方法中指定超时时间,则会使用该值作为超时时间

3.2. wait

wait(timeout=None)

栅栏对象中最重要的方法就是 wait 方法了。 线程阻塞等待,直到构造方法传入的 parties 个线程均阻塞等待在 wait 方法或超时,如果该方法传入的超时时间为 None,则使用构造方法传入的默认超时。 一旦超时发生,栅栏将立即进入破损状态,此时其他仍阻塞等待该栅栏的线程将收到 wait 方法抛出的 BrokenBarrierError 异常。 如果试图在已破损的栅栏对象上调用 wait 方法,也会立即抛出 BrokenBarrierError 异常。 返回一个数字,值为 0 到 parties - 1,解释器保证了所有等待在同一个栅栏上的线程中,每一个的返回值都不同,以便让你可以依赖 wait 方法的返回值来做一些处理。 如果创建栅栏对象时在构造函数中提供了 action 参数,它将在其中一个线程释放前被调用。如果此调用引发了异常,栅栏对象将进入破损状态。

3.3. reset

reset()

重置栅栏为默认的初始态。 如果栅栏中仍有线程等待释放,这些线程将会收到 BrokenBarrierError 异常。 除非非常必要,否则并不建议使用该方法,很多时候与其重用一个状态未知的栅栏,不如新建一个。

3.4. abort

abort()

使栅栏进入破损态。 这将导致所有已经调用和未来调用的 wait() 方法中引发 BrokenBarrierError 异常。

3.5. 栅栏对象的属性

  • parties — 冲出栅栏所需要的线程数量
  • n_waiting — 当前时刻正在栅栏中阻塞的线程数量
  • broken — 一个布尔值,值为 True 表明栅栏为破损态

3.6. 示例

栅栏的使用虽然简单,但却十分实用,在实际环境中,我们通常需要并发调用很多业务方的接口,并收集他们的返回,然后在所有接口均返回后再进行下一步处理。 但并不是所有接口的调用都是必须的,因此对于该场景,一个必要的优化方式是一旦收集到必要接口的返回,立即中断其他接口的调用,并开始这之后的操作。 上述需求如果使用栅栏来解决会显得非常简单而优雅,虽然 Python 中我们并不能在线程外终止线程,但我们可以通过栅栏的 abort 方法让那些尚未执行结束的线程一旦执行结束即抛出异常,从而让我们不需要去关注他们。 下面的例子模拟了上面描述的过程。

代码语言:javascript
复制
import logging
import random
from threading import Thread, Barrier
from time import sleep, time

class InterfaceThread(Thread):
    def __init__(self, majorbarrier, minorbarrier, id, major):
        super().__init__()
        self._majorbarrier = majorbarrier
        self._minorbarrier = minorbarrier
        self._id = id
        self._major = major

    def run(self):
        nsec = random.uniform(0, 4)
        logging.info('%r start running sleep %s' % (self, nsec))
        sleep(nsec)
        logging.info('%r after sleeping' % self)
        if self._major:
            try:
                result = self._majorbarrier.wait()
                if result == 0:
                    self._minorbarrier.abort()
            except:
                logging.error('%s waitting on majorbarrier aborted' % self)
                return
        else:
            try:
                self._minorbarrier.wait()
            except:
                logging.warning('%s watting on minorbarrier aborted' % self)
                return
        logging.info('%r continue running after barrier' % self)

    def __repr__(self):
        return 'InterfaceThread(%s【major: %s】)' % (self._id, self._major)

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')

    start = time()
    majorbarrier = Barrier(4)
    minorbarrier = Barrier(3)
    threads = list()
    for i in range(6):
        threads.append(InterfaceThread(majorbarrier, minorbarrier, i, bool(i >= 3)))
    for thread in threads:
        thread.start()
    result = majorbarrier.wait()
    if result == 0:
        minorbarrier.abort()
    logging.info('run by %s' % (time() - start))

上面的例子中创建了两个栅栏对象,分别用来同步必要接口调用与非必要接口调用,我们通过随机 sleep 0 到 4 秒来模拟接口调用。 一旦必要栅栏的 wait 方法返回 0,则意味着必要接口已全部返回,此时可以通过调用非必要栅栏的 abort 方法来破坏非必要栅栏,同时程序继续执行,从而实现整体运行时间的最大限度缩短。

打印出了:

2019-05-14 14:00:05,045 - INFO: InterfaceThread(0【major: False】) start running sleep 1.3645551759667334 2019-05-14 14:00:05,050 - INFO: InterfaceThread(1【major: False】) start running sleep 3.5451267021153607 2019-05-14 14:00:05,050 - INFO: InterfaceThread(2【major: False】) start running sleep 3.0433784558963644 2019-05-14 14:00:05,052 - INFO: InterfaceThread(3【major: True】) start running sleep 2.0092681547999875 2019-05-14 14:00:05,053 - INFO: InterfaceThread(4【major: True】) start running sleep 2.266415383907653 2019-05-14 14:00:05,053 - INFO: InterfaceThread(5【major: True】) start running sleep 0.6692143957122372 2019-05-14 14:00:05,728 - INFO: InterfaceThread(5【major: True】) after sleeping 2019-05-14 14:00:06,416 - INFO: InterfaceThread(0【major: False】) after sleeping 2019-05-14 14:00:07,077 - INFO: InterfaceThread(3【major: True】) after sleeping 2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) after sleeping 2019-05-14 14:00:07,329 - INFO: InterfaceThread(4【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - INFO: run by 2.284111976623535 2019-05-14 14:00:07,329 - INFO: InterfaceThread(5【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - INFO: InterfaceThread(3【major: True】) continue running after barrier 2019-05-14 14:00:07,329 - WARNING: InterfaceThread(0【major: False】) watting on minorbarrier aborted 2019-05-14 14:00:08,109 - INFO: InterfaceThread(2【major: False】) after sleeping 2019-05-14 14:00:08,110 - WARNING: InterfaceThread(2【major: False】) watting on minorbarrier aborted 2019-05-14 14:00:08,613 - INFO: InterfaceThread(1【major: False】) after sleeping 2019-05-14 14:00:08,613 - WARNING: InterfaceThread(1【major: False】) watting on minorbarrier aborted

可以看到,并发调用六个线程,按照 sleep 时间,应该在 3.5451267 秒以上。 而实际上,由于重要线程均以完成,主线程只用 2.284111976623535 秒便已返回。 这样,我们就实现了接口性能的大幅提升,但线程 1、2 由于 sleep 时间过长,没有能够在主线程返回前返回。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-05-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 引言
  • 2. 事件对象 — Event
    • 2.1. is_set
      • 2.2. set
        • 2.3. clear
          • 2.4. wait
            • 2.5. 示例
            • 3. 栅栏对象 — Barrier
              • 3.1. 构造方法
                • 3.2. wait
                  • 3.3. reset
                    • 3.4. abort
                      • 3.5. 栅栏对象的属性
                        • 3.6. 示例
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档