是计算机中已运行程序的实体。进程与程序不同,程序本身只是指令、数据及其组织形式的描述,进程才是程序的真正运行的实体。
计算机程序只不过是磁盘中可执行的二进制(或其他类型)的数据。它们只有在被读取到内存中,被操作系统调用时才开始它们的生命周期。
进程(亦称为重量级进程)是程序的一次执行。每个进程都有自己的地址空间、内存、数据栈及其他记录其运行轨迹的辅助数据。操作系统管理在其上运行所有的进程,并为这些进程公平分配时间、进程也可以通过fork和spawn操作来完成其他的任务。
不过进程有自己的内存空间,数据栈等,所以只能使用进程间通讯(interprocess communication, IPC),而不能直接共享信息。
是操作系统能够进行运算调度的最小单位。他被包含在进程中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
线程(亦称为轻量级进程)跟进程有些相似,不同的是:所有的线程运行在同一个进程中,共享相同的运行环境。它们可以被想象成是在主进程或“主线程”中并行运行的“迷你进程”。
线程有开始,顺序执行和结束三部分。它有一个自己的指令指针,记录自己运行到什么地方。线程的运行可能被抢占(中断)或暂时的被挂起(睡眠),让其他线程运行,这叫做让步。
一个进程中的各个线程之间共享同一片数据空间,所以线程之间可以比进程之间更方便地共享数据以及相互通讯。线程一般都是并发执行的,正是由于这种并行和数据共享的机制使得多个任务的合作变成可能。
进程好比一列火车,多个进程就是多列火车。线程好比车厢,多个线程就是一条火车的多个车厢。线程需要在进程下运行,就好比单独的车厢无法行驶一样。
Python3 线程中常用的两个模块为:
thread 模块已被废弃。用户可以使用 threading 模块代替。所以,在 Python3 中不能再使用"thread" 模块。为了兼容性,Python3 将 thread 重命名为 "_thread"。本文中将介绍使用threading模块。
表示控制线程的类。这个类可以以有限的方式安全地子类化。有两种方法 指定活动:通过传递一个可调用对象给构造函数,或通过重写子类中的run()方法。
threading.Thread(
group=None,
target=None,
name=None,
args=(),
kwargs=None,
*,
daemon=None,
)
**group:**值为None,为以后版本而保留。 **target:**表示一个可调用对象,线程启动时,run()方法将调用此对象,默认为None,表示不用调用任何内容。 **name:**表示当前线程名称,默认创建一个"Thread-N"格式的唯一名称。 **args:**表示传递给target函数的参数元组。 **kwargs:**表示传递给target函数的参数字典。
threading模块对象 | 描述 |
---|---|
Thread | 表示一个线程的执行的对象 |
Lock | 锁原始对象(跟thread模块里的锁对象相同) |
RLock | 可重入锁对象。使单线程可以再次获得已经获得了的锁(递归锁定) |
Condition | 条件变量对象能让一个线程停下来,等待其他线程满足了某个“条件”。如状态的改变或值的改变 |
Event | 通用的条件变量。多个线程可以等待某个时间的发生,在事件发生后,所有的线程都被激活 |
Semaphore | 为等待锁的线程提供一个类似“等候室”的结构 |
BoundedSemaphore | 与Semaphore类似,只是它不允许超过初始值 |
Timer | 与thread类似,只是它要等待一段时间后才开始运行 |
线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
函数 | 描述 |
---|---|
start() | 启动线程活动。 |
run() | 定义线程的功能的函数(一般会被子类重写),用以表示线程活动的方法。 |
join(timeout=None) | 程序挂起,直到线程结束;如果给了timeout,则最多阻塞timeout秒。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。 |
getName() | 返回线程的名字 |
setName(name) | 设置线程的名字 |
isAlive() | 布尔标志,表示这个线程是否还在运行中 |
isDaemon() | 返回线程的daemon标志 |
setDaemon(daemonic) | 把线程的daemon标志设为daemonic(一定要在调用start()函数前调用) |
用Thread类,可以用多种方法来创建线程。
创建Thread实例,传给它一个函数
import threading
import time
def run():
# 每个子线程执行输出三次
for i in range(3):
time.sleep(2)
name = threading.current_thread().name
_time = time.ctime(time.time())
print("Thread name is {}, and time is {}\n".format(name, _time))
if __name__ == "__main__":
print("主线程开始".center(15,'-'))
# 创建四个线程,存入列表
num = 4
threads = [threading.Thread(target=run) for i in range(num)]
for thread in threads:
# 开启线程
thread.start()
for thread in threads:
# 等待子线程结束
thread.join()
print("主线程结束".center(15,'-'))
结果
-----主线程开始----- Thread name is Thread-1, and time is Fri Dec 19 14:55:23 2020 Thread name is Thread-2, and time is Fri Dec 19 14:55:23 2020 Thread name is Thread-3, and time is Fri Dec 19 14:55:23 2020 Thread name is Thread-4, and time is Fri Dec 19 14:55:23 2020 Thread name is Thread-4, and time is Fri Dec 19 14:55:25 2020 Thread name is Thread-2, and time is Fri Dec 19 14:55:25 2020 Thread name is Thread-3, and time is Fri Dec 19 14:55:25 2020 Thread name is Thread-1, and time is Fri Dec 19 14:55:25 2020 Thread name is Thread-4, and time is Fri Dec 19 14:55:27 2020 Thread name is Thread-3, and time is Fri Dec 19 14:55:27 2020 Thread name is Thread-2, and time is Fri Dec 19 14:55:27 2020 Thread name is Thread-1, and time is Fri Dec 19 14:55:27 2020 -----主线程结束-----
创建一个Thread实例,传给它一个可调用的类对象
与传一个函数很相似,但它是传一个可调用的类的实例供线程启动的时候执行,这是多线程编程的一个更为面向对象的方法。
import threading
import time
class threadFunc:
def __init__(self, threadID, counter):
self.name = threadID
self.counter = counter
def __call__(self):
print ("开始子线程{}\n".format(self.name))
print_time(self.name, 5, self.counter)
print ("结束子线程{}".format(self.name))
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print ("子线程{}: {}\n".format(threadName, time.ctime(time.time())))
counter -= 1
if __name__ == "__main__":
print("主线程开始".center(15,'-'))
# 创建四个线程,存入列表
counter = 2
num = 4
threads = [threading.Thread(target=threadFunc(i, counter)) for i in range(num)]
for thread in threads:
# 开启线程
thread.start()
for thread in threads:
# 等待子线程结束
thread.join()
print("主线程结束".center(15,'-'))
-----主线程开始----- 开始子线程0 开始子线程1 开始子线程2 开始子线程3 子线程0: Fri Dec 18 16:44:38 2020 子线程1: Fri Dec 18 16:44:38 2020 子线程2: Fri Dec 18 16:44:38 2020 子线程3: Fri Dec 18 16:44:38 2020 子线程0: Fri Dec 18 16:44:43 2020 子线程1: Fri Dec 18 16:44:43 2020 子线程2: Fri Dec 18 16:44:43 2020 子线程3: Fri Dec 18 16:44:43 2020 结束子线程0 结束子线程2 结束子线程3 结束子线程1 -----主线程结束-----
Thread派生一个子类,创建这个子类的实例
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("开始线程:{}\n".format(self.name))
print_time(self.name, 5, self.counter)
print ("退出线程:{}\n".format( self.name))
def print_time(threadName, delay, counter):
while counter:
if exitFlag:
threadName.exit()
time.sleep(delay)
print ("{}: {}\n".format(threadName, time.ctime(time.time())))
counter -= 1
if __name__ == "__main__":
# 实例化创建新线程
thread1 = myThread(1, "Thread-1", 5)
thread2 = myThread(2, "Thread-2", 5)
# 开启新线程
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print ("退出主线程")
结果
开始线程:Thread-1 开始线程:Thread-2 Thread-1: Fri Dec 19 15:11:46 2020 Thread-2: Fri Dec 19 15:11:46 2020 Thread-1: Fri Dec 19 15:11:51 2020 Thread-2: Fri Dec 19 15:11:51 2020 Thread-1: Fri Dec 19 15:11:56 2020 Thread-2: Fri Dec 19 15:11:56 2020 Thread-1: Fri Dec 19 15:12:01 2020 Thread-2: Fri Dec 19 15:12:01 2020 Thread-1: Fri Dec 19 15:12:06 2020 Thread-2: Fri Dec 19 15:12:06 2020 退出线程:Thread-1 退出线程:Thread-2 退出主线程
当一个线程结束计算,它就退出了。也可以使用Python退出进程的标准方法,如sys.exit()或抛出一个SystemExit异常等。不过,你不可以直接杀掉Kill一个线程。
threading模块能确保所有"重要的"子线程都退出后,进程才会结束。
主线程应该是一个好的管理者,它要了解每个线程都要做些什么事,线程都需要什么数据和什么参数,以及在线程结束的时候,它们都提供了什么结果。这样,主线程就可以把各个线程的结果组成一个有意义的最后结果。
输出
-----主线程开始----- 子线程Thread-5执行任务0 子线程Thread-4执行任务0 子线程Thread-5执行任务1 子线程Thread-4执行任务1 子线程Thread-4执行任务2 子线程Thread-5执行任务2 -----主线程结束----- An exception has occurred, use %tb to see the full traceback. SystemExit: 0
先看下面的例子:
from threading import Thread
import time
def plus():
time.sleep(2)
print('thread-1 start.'.center(20, '-'))
global num
num += 100
print(f'num +=100 = {num}')
print('thread-1 end.'.center(20, '-'))
def minus():
time.sleep(2)
print('thread-2 start.'.center(20, '-'))
global num
num -= 100
print(f'num -=100 = {num}')
print('thread-2 end.'.center(20, '-'))
num = 200
if __name__ == "__main__":
print("MainThread Start.".center(30, '-'))
print(f'num = {num}')
thread1 = Thread(target=plus)
thread2 = Thread(target=minus)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("MainThread End.".center(30, '-'))
结果
------MainThread Start.------- num = 200 --thread-2 start.--- num -=100 = 100 ---thread-2 end.---- --thread-1 start.--- num +=100 = 200 ---thread-1 end.---- -------MainThread End.--------
定义一个全球变量num = 100,两个线程分别对num进行处理,第一个num+100 = 200,第二个对num - 100 =200,说明两个线程之间是可以共享数据。
在上面的这种的情况下,就需要对全局变量通过一定的方式保护其不被随意修改,不然会造成多线程之间对全局变量使用的混乱。那么保护其不被任意修改,需要把这个资源"锁"住,只允许线程依次排队进去获取这个资源。
互斥锁(Mutual exclusion),互斥锁是一种简单的加锁的方法来控制对共享资源的访问,可以防止多个线程同时读写某一块内存区域。互斥锁只有两种状态,即上锁( lock )和解锁( unlock )。
threading模块中使用Lock类处理锁定。
# 创建锁
lock = threading.Lock()
# 锁定
lock.acquire()
# 释放锁
lock.release()
acquire(blocking=True, timeout=-1) -> bool 获取锁定,必要时需要阻塞到锁定释放为止。如果设置"blocking=False",则当无法获取锁定时将立即返回False,成功则返回True。 release() 释放一个锁定。当锁定处于未锁定时将出现错误。
from threading import Thread, Lock
import time
num = 100
def task():
global num
lock.acquire() # 上锁
temp = num
time.sleep(1)
num = temp - 1 # 实际任务,将全局变量num减1
print(f"Tasks have successfully completed, and now num is {num}")
lock.release() # 释放锁
if __name__ == "__main__":
lock = Lock()
# 实例化10个线程并已列表推导式形式保存在列表中
threads = [Thread(target=task) for _ in range(10)]
for thread in threads:
thread.start() # 创建线程
for thread in threads:
thread.join() # 等待子线程结束
结果
Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 98 Tasks have successfully completed, and now num is 97 Tasks have successfully completed, and now num is 96 Tasks have successfully completed, and now num is 95 Tasks have successfully completed, and now num is 94 Tasks have successfully completed, and now num is 93 Tasks have successfully completed, and now num is 92 Tasks have successfully completed, and now num is 91 Tasks have successfully completed, and now num is 90
如果不使用锁,则结果为
Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99 Tasks have successfully completed, and now num is 99
没有加锁的程序中,十个任务同时竞争同一个资源,每个线程都是第一时间拿到num并都执行task()「num - 1」。使用lock.acquire()函数实现资源锁定后,第一获取资源的线程锁定后,其他线程等待lock.release()解锁。所以每次只有一个线程执行task()函数,其结果是每次在上次的结果下减去1。
死锁产生的4个必要条件:
# 线程死锁,在classA和classB相互调用对方的方法,并且相互等待对方释放资源
class ClassA:
def __init__(self):
self.lock=threading.RLock()
# 得到classA的锁,试图得到classB的锁
def infoA(self, b):
self.lock.acquire()
time.sleep(10)
b.info()
self.lock.release()
def info(self):
self.lock.acquire()
print("this is ClassA info")
self.lock.release()
class ClassB:
def __init__(self):
self.lock = threading.RLock()
# 得到classB的锁,试图得到classA的锁
def infoB(self, a):
self.lock.acquire()
time.sleep(10)
a.info()
self.lock.release()
def info(self):
self.lock.acquire()
print("this is ClassB info")
self.lock.release()
ca=ClassA()
cb=ClassB()
def funA():
ca.infoA(cb)
def funB():
cb.infoB(ca)
# 函数调用不是线程,不会死锁
# funA()
# funB()
t1=threading.Thread(target=funA).start()
t2=threading.Thread(target=funB).start()
是一种特殊的结构,类似于列表。不过就像排队一样,队列中的元素一旦取出,那么就会从队列中删除。
某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。大概的结构如下图。
假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。
使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后面的帖子会讲两种并发类型下的应用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。
其实当初这个模式,主要就是用来处理并发问题的。
缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
定义一个生产者类Producet,定义一个消费者Consumer。生产者生产10件产品并依次存入队列,而消费者依次从队列中取出产品。
from queue import Queue
import random
import threading
import time
# 定义生产者类
class Producer(threading.Thread):
def __init__(self, name, queue):
threading.Thread.__init(self, name=name)
self.data=queue
def run(self):
for i in range(5):
print("生产者{}往队列中放入产品{}".format(self.getName(), i))
self.data.put(i)
time.sleep(random.random())
print("生产者{}完成任务。".format(self.getName())
# 定义消费者类
class Consumer(threading.Thread):
def __init__(self, name, queue):
threading.Thread.__init(self, name=name)
self.data=queue
def run(self):
for i in range(5):
val = self.data.get()
print("消费者{}往队列中取出产品{}".format(self.getName(), i))
time.sleep(random.random())
print("消费者{}完成任务。".format(self.getName())
if __name__ == "__main__":
print("主线程开始".center(30,'-'))
queue = Queue()
producer = Producer('Producer', queue)
consumer = Consumer('Consumer', queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
print("主线程结束".center(30,'-'))