前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python模块之threading

python模块之threading

作者头像
枇杷李子橙橘柚
发布2019-05-26 11:05:22
9430
发布2019-05-26 11:05:22
举报
文章被收录于专栏:没有擅长的YcY没有擅长的YcY

threading在低级的_thread模块上构建了更高级的线程接口。

threading模块基于Java线程模型设计。不过Java中锁和条件变量是每个对象的基本行为,在python中却是单独的对象。python的Thread类行为是Java的Thread类行为的子集,目前尚不支持优先级、线程组,线程无法销毁、停止、暂停、恢复或中断。Java中Thread类的静态方法在Python中映射为模块级的函数。

模块级函数

threading.active_count() 返回当前活动的Thread对象的数量,与enumerate()函数返回的列表元素个数相同

threading.current_thread() 返回当前Thread对象,对应调用者的控制线程(thread of control)。如果调用者的控制线程不是通过threading模块创建,返回一个功能受限的哑线程对象(dummy thread object)

threading.get_ident() 返回一个非零整数,代表当前线程的"线程标识符"。这个值意在作为魔术cookie使用,例如作为索引从特定于线程的字典对象获取数据。当一个线程退出,新的线程创建,线程标识符可能被回收使用

threading.enumerate() 返回当前活动Thread对象的列表。该列表包含守护线程、current_thread()函数创建的哑线程,以及主线程,不包含已终止的线程和未启动的线程。

threading.main_thread() 返回主线程对象。通常来说,主线程就是启动python解释器的线程。

threading.settrace(func) 为启动自threading模块的所有线程设置一个trace函数。在每个线程的run()方法调用前,传递func参数给sys.settrace()

threading.setprofile(func) 为启动自threading模块的所有线程设置一个profile函数。在每个线程的run()方法调用前,传递func参数给sys.setprofile()

threading.stack_size([size]) 返回创建新线程使用的线程堆栈大小。

可选参数size指定后续创建的线程的堆栈大小,必须是0(表示使用平台或配置的默认值)或大于等于32768(32KiB)的正整数。如果未指定,默认size为0.

如果不支持改动线程堆栈大小,抛出RuntimeError异常。如果size不合法,抛出ValueError异常,堆栈大小保持不变。

32KiB是目前能保证解释器堆栈空间充足的最小值。某些平台可能对堆栈大小做了特殊的限制,比如要求最小堆栈大小在32KiB以上,或要求以系统内存页大小的倍数分配。

Windows系统及使用POSIX线程的系统可用

常量

threading.TIMEOUT_MAX 阻塞函数(Lock.acquire(), RLock.acquire(), Condition.wait()等)的timeout参数可接受的最大值。超出该值将抛出OverflowError异常。

Thread-Local Data

Thread-local数据的值是特定于线程的。管理Thread-local数据,只需要创建local或其子类的实例并在该实例上存储属性:

代码语言:javascript
复制
mydata = threading.local()
mydata.x = 1

不同的线程,实例的值也会不同。

class threading.local

表示thread-local数据的类。

Thread

Thread类代表在单独的控制线程中运行的活动,有两种方式指定:传递可调用对象到构造器的target参数,或重写子类的run()方法。除了__int__()方法和run()方法,Thread子类不应该重写除此之外的其他方法。

创建的线程对象,必须使用start()方法启动,start()在一个单独的控制线程调用run()方法。这时该线程被认为是"活动的"。当run()方法结束(正常执行完成或抛出了未处理的异常)时,线程对象不再是"活动的"。is_alive()方法可用于检查线程是否处于活动状态。

调用线程对象的join()方法将导致线程阻塞,直到调用join()方法的线程执行结束。

线程拥有名字,可以传递给构造器。通过name属性读取或修改。

主线程:对应python程序的初始控制线程。主线程不是守护线程。

守护线程:当没有非守护线程处于活动状态时,整个python程序将退出。通过daemon属性或构造器参数,可以标记一个线程为守护线程。daemon属性的初始值继承自创建该线程的线程

哑线程:对应"外部线程"alien thread,即在threading模块之外(比如C代码)启动的控制线程。哑线程具有有限的功能,总是认为是活动的和守护的,不能调用join()方法。它们永远不会被删除,因为不能检测外部线程的结束情况。

Note:守护线程将在程序关闭时直接停止。相关资源(比如打开的文件、数据库事务等)可能不会被妥善地释放。如果想要线程优雅地停止,将线程设置为非守护线程,并使用合适的信号机制比如Event

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

  • group:None。作为将来实现ThreadGroup类后的保留参数。
  • target:可调用对象,将被run()方法调用
  • name:线程名称。默认构建Thread-N形式的唯一名称。
  • args:target调用需要接收的位置参数,元组形式
  • kwargs:target调用需要接收的关键字参数,字典形式
  • daemon:传递一个布尔值,标记该线程是否为守护线程。None表示继承创建该线程的当前线程的daemon属性。

如果子类继承Thread并重写构造器,必须确保在执行线程的其他操作前在构造器中调用Thread.__init__()

start() 开启线程。每个线程最多只能调用一次,否则抛出RuntimeError异常。它将在一个单独的控制线程调用线程对象的run()方法。

run() 定义线程功能的方法,通常在子类中重写。标准的run()方法调用传入构造器的可调用对象target(存在的话),并使用args和kwargs分别作为target的位置参数和关键字参数。

代码语言:javascript
复制
# 创建Thread的实例,传给它一个函数

from threading import Thread
from time import sleep, ctime

sleep_time = [4, 2]


def task(task_tag, sleep_tag):
    print("task", task_tag, "started at:", ctime())
    sleep(sleep_tag)
    print("task", task_tag, "done at:", ctime())


def main():
    print("Main thread started at:", ctime())
    threads = []
    nloops = range(len(sleep_time))  # [0, 1]

    for i in nloops:
        t = Thread(target=task, args=(i, sleep_time[i]))
        threads.append(t)

    for i in nloops:
        threads[i].start()  # 启动线程

    for i in nloops:
        threads[i].join()  # 主线程阻塞,直至调用join()方法的线程终止

    print("Main thread done at:", ctime())


if __name__ == '__main__':
    main()
代码语言:javascript
复制
# 派生Thread的子类,并创建子类的实例

from threading import Thread
from time import sleep, ctime

sleep_time = [4, 2]


class MyThread(Thread):
    # 重写run()方法
    def run(self):
        print(self.name, "started at:", ctime())
        self._target(self._args)
        print(self.name, "done at:", ctime())


def task(sleep_tag):
    sleep(sleep_tag)


def main():
    print("Main thread started at:", ctime())
    threads = []
    nloops = range(len(sleep_time))

    for i in nloops:
        t = MyThread(target=task, args=sleep_time[i], name=task.__name__ + str(i))
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print("Main thread done at:", ctime())


if __name__ == '__main__':
    main()

join(timeout=None) 阻塞主线程直到调用join方法的线程终止(可能是正常执行完成,也可能是抛出了未处理的异常)或达到timeout设定的时间。可多次调用。

  • timeout:阻塞时间(秒)。如果为None,表示一直阻塞直至调用join方法的线程终止;如果不为None,表示阻塞的时间,达到该时间后,不管调用join()方法的线程是否执行完成,继续执行主线程或其他启动的线程。

如果线程调用join()方法可能导致死锁,或在调用start()之前调用join(),抛出RuntimeError异常。

name 获取或设置线程名称。多个线程可能名称相同,初始值由构造器设置。

ident 线程标识符,如果为None说明该线程未启动。当一个线程退出,新的线程创建,线程标识符可能被回收使用。即使线程退出,该标识符仍可用。

is_alive() 判断线程是否处于活动状态。

daemon 布尔标志,表示这个线程是否是守护线程。必须在调用start()之前设置,否则抛出RuntimeError异常。初始值继承自创建该线程的线程。主线程不是守护线程,因此在主线程中创建的线程daemon属性默认值为False

CPython实现细节:在CPython中,由于GIL的原因,一次只有一个线程能够执行python代码(即使某些面向性能的库能克服这个限制???)。想要python程序更好地利用多核机器的计算机资源(计算密集型),建议使用multiprocessingconcurrent.futures.ProcessPoolExecutor。如果是同时运行多个I/O密集型任务,threading仍然不失为一个合适的模块

Lock

原语锁,是同步原语的一种,当它处于"locked"状态时不属于特定线程。在python中,这是目前可用的最低级的同步原语,实现自_thread扩展模块。

原语锁有两种状态:locked(锁定)或unlocked(未锁定)。创建时为未锁定状态。 原语锁有两种方法:acquire()release()。当锁处于未锁定状态时,acquire()改变其为锁定状态。当锁处于锁定状态时,调用acquire()方法将导致线程阻塞,直到其他线程调用release()释放锁。

class threading.Lock

acquire(blocking=True, timeout=-1) 获取锁。成功返回True,获取返回False。

  • blocking:默认为True,在获取到锁之前阻塞线程;反之即使没有获取到锁也不会阻塞线程。
  • timeout:指定线程阻塞的最长时间,单位为秒;-1表示无限制等待。当blocking为False时,禁止指定timeout参数

release() 释放锁。任何线程都可以调用,不只是获取了锁的线程。

锁更改为未上锁状态后,对于调用了acquire()方法而导致阻塞的线程,将由系统决定哪个线程获取到锁。

release()方法只能在上锁状态调用,否则将抛出RuntimeError异常。

RLock

重入锁,同步原语的一种,可由同一线程多次获取已持有的锁。除了原语锁的上锁/解锁状态,重入锁还使用了owning threadrecursion level的概念。在上锁状态,可能有多个线程拥有锁;在解锁状态,没有线程拥有锁。

acquire()/release()必须成对出现,可以嵌套,只有最后一个release(即最外层的release)调用才会最终释放锁。

class threading.RLock

acquire(blocking=True, timeout=-1) 使用默认参数调用时,如果当前线程已经拥有锁,增加1次递归深度并立即返回;如果是其他线程拥有锁,阻塞当前线程直到锁被释放。一旦锁释放(递归深度为0,此时锁不属于任何线程),各个线程争夺锁,并设置递归深度为1。

release() 释放锁且递归深度减1。如果调用后递归深度为0,重置锁为未锁定状态(不属于任何线程),由其他线程争夺锁。如果调用后递归深度非0,锁仍为上锁状态,属于当前线程。

只能由已经获取了锁的线程调用,否则抛出RuntimeError异常。

Condition

condition变量总是与某种锁相联系:传入或者默认创建的锁对象。传入锁对象适用于多个condition变量需要共享同一个锁的场景。锁是condition对象的一部分,不需要对锁单独进行追踪。

condition对象遵循上下文管理协议:使用with语句在封闭块内获取关联的锁对象,在condition对象上调用acquire和release实际上调用的是关联锁的对应方法。

class threading.Condition(lock=None)

条件变量允许一个或多个线程等待,直到接收到另一个线程的通知。

lock参数必须是LockRLock对象,作为底层的锁使用。默认使用RLock

acquire(*args) 调用底层lock对象的acquire()方法获取锁

release() 调用底层lock对象的release()方法释放锁

wait(timeout=None) 释放锁并阻塞当前线程直到被另外一个线程调用notify()notify_all()唤醒,或者达到设置的timeout时间,任意一种情况都将重新获取锁并返回。 只能由已获取到锁的线程调用,否则抛出RuntimeError异常。

3.2版本前该方法始终返回None,3.2版本开始除非超时会返回False,其他情况都返回True

wait_for(predicate, timeout=None) 阻塞当前线程直到可调用对象predicate返回值为True或bool()判断为True。

wait_for方法将不断调用wait()方法直到超时或满足predicate返回值为True或bool()判断为True。

返回值为最后一次执行predicate的返回值,如果超时返回False。

只能由已获取到锁的线程调用,否则抛出RuntimeError异常。

notify(n=1) 唤醒wait()或wait_for()状态下的某个线程。只能由已获取到锁的线程调用,否则抛出RuntimeError异常。

notify_all() 唤醒wait()或wait_for()状态下的所有线程。只能由已获取到锁的线程调用,否则抛出RuntimeError异常。

notify()和notify_all()并不释放锁。意思是调用wait()方法的线程不会立即返回,需要等到调用notify()和notify_all()的线程释放锁之后才返回。

代码语言:javascript
复制
# 生产者-消费者模式中Condition的用法

# 消费者:
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# 生产者:
with cv:
    make_an_item_available()
    cv.notify()

# 消费者(使用wait_for改进):
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()

Semaphore Objects

信号量对象管理一个内部计数器,随着调用acquire()减1,release()调用加1,但一定不会小于0。当调用acquire()时如果计数器等于0将会阻塞线程直到某个线程调用release()方法。支持上下文管理器协议

class threading.Semaphore(value=1)

指定初始计数器的信号量,每调用一次release()加1,每调用一次acquire()减1。

acquire(blocking=True, timeout=None) 获取信号量。

使用默认参数调用时:

代码语言:javascript
复制
1. 如果计数器大于0,减1并立即返回True
2. 如果计数器等于0,阻塞直到某个线程调用release()唤醒,唤醒后计数器减1并返回True

release() 释放信号量。

class threading.BoundedSemaphore(value=1)

边界信号量,计数器值不能超过设置的最大边界。常用于限制资源占用的场景比如数据库连接。

Event Objects

事件是最简单的线程间通信机制。事件对象管理一个内部标志,调用set()时该标志为True,调用clear()时该标志为False,调用wait()时线程阻塞直到标志为True

class threading.Event

is_set() 如果事件标志为True,返回True

set() 设置事件标志为True。将唤醒所有调用了wait()而阻塞的线程。

clear() 重置事件标志为False。将阻塞所有调用了wait()的线程。

wait(timeout=None) 阻塞线程直到事件标志为True或超时。

Timer Objects

Timer继承自Thread,表示经过一定时间后要运行的任务。

class threading.Timer(interval, function, args=None, kwargs=None)

创建定时器,在interval时间后运行function任务。

cancel() 终止定时器并结束任务(仅对待执行状态中的任务有效)。


相关文章

Python 多线程: threading.local类

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 模块级函数
  • 常量
  • Thread-Local Data
    • class threading.local
    • Thread
      • class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
      • Lock
        • class threading.Lock
        • RLock
          • class threading.RLock
          • Condition
            • class threading.Condition(lock=None)
            • Semaphore Objects
              • class threading.Semaphore(value=1)
                • class threading.BoundedSemaphore(value=1)
                • Event Objects
                  • class threading.Event
                  • Timer Objects
                    • class threading.Timer(interval, function, args=None, kwargs=None)
                    • 相关文章
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档