第十五章 并发编程三剑客-进程, 线程与协程
书写上文, 我们可以知道: 进程是资源(CPU、内存等)分配的基本单位,它是程序执行时的一个实例. 程序运行时系统就会创建一个进程,并为它分配资源,然后把该进程放入进程就绪队列,进程调度器选中它的时候就会为它分配CPU时间,程序开始真正运行. 进程切换需要的资源很最大,效率低。
进程创建有两种方式: 方法包装和类包装. 熟悉Java的人可能会发现, 类包装更符合我们原来的书写习惯 创建进程后, 我们使用start() 来启动进程
主要步骤:
实操代码
import time
from multiprocessing import Process
class MyProcess(Process):
"""进程的创建方式: 2.类包装"""
def __init__(self, name):
Process.__init__(self)
self.name = name
def run(self):
print(f"进程{self.name} 启动")
time.sleep(3)
print(f"进程{self.name} 结束")
if __name__ == "__main__":
print("创建进程")
p1 = MyProcess("p1")
p2 = MyProcess("p2")
p1.start()
p2.start()
执行结果
主要步骤:
实操代码
import os
import time
from multiprocessing import Process
def function(name):
"""进程的创建方式: 1.方法包装"""
print("当前进程ID:", os.getpid())
print("父进程ID", os.getppid())
print(f"Process:{name} start")
time.sleep(3)
print(f"Process:{name} end")
if __name__ == "__main__":
print("当前main进程ID: ", os.getppid())
# 创建进程
p1 = Process(target=function, args=("p1",))
p2 = Process(target=function, args=("p2",))
p1.start()
p2.start()
执行结果
多说一句
元组中如果只有一个元素, 是需要加逗号的!!! 这是因为括号( )既可以表示tuple,又可以表示数学公式中的小括号, 所以如果没有加逗号,那你里面放什么类型的数据那么类型就会是什么.
进程间通信有两种方式: Queue队列和Pipe管道方式
实现核心:
实操代码
from multiprocessing import Process, Queue
class MyProcess(Process):
def __init__(self, name, mq):
Process.__init__(self)
self.name = name
self.mq = mq
def run(self):
print("Process {} started".format(self.name))
print("===Queue", self.mq.get(), "===")
self.mq.put(self.name)
print("Process {} end".format(self.name))
if __name__ == "__main__":
# 创建进程列表
t_list = []
mq = Queue()
mq.put("1")
mq.put("2")
mq.put("3")
# 利用range序列重复创建进程
for i in range(3):
t = MyProcess("p{}".format(i), mq)
t.start()
t_list.append(t)
# 等待进程结束
for t in t_list:
t.join()
print(mq.get())
print(mq.get())
print(mq.get())
执行结果
Pipe 直译过来的意思是“管”或“管道”,和实际生活中的管(管道)是非常类似的. Pipe方法返回(conn1, conn2)代表一个管道的两个端.
实现核心
conn1, conn2 = multiprocessing.Pipe()
conn1.send/conn1.recv
实操代码
import multiprocessing
import time
def fun1(conn1):
"""
管道结构
进程<==>conn1(管道头)==pipe==conn2(管道尾)<==>进程2
"""
sub_info = "进程向conn1发送消息, 管道另一头conn2 可以接收到消息"
print(f"进程1--{multiprocessing.current_process().pid}发送数据:{sub_info}")
time.sleep(1)
conn1.send(sub_info) # 调用conn1.send发送消息, 发送的消息会被管道的另一头接收
print(f"conn1接收消息:{conn1.recv()}") # conn1.recv接收消息, 如果没有消息可接收, recv方法会一直阻塞. 如果管道已经被关闭,那么recv方法会抛出EOFError
time.sleep(1)
def fun2(conn2):
sub_info = "进程向conn2发送消息, 管道另一头conn1 可以接收到消息"
print(f"进程2--{multiprocessing.current_process().pid}发送数据:{sub_info}")
time.sleep(1)
conn2.send(sub_info)
print(f"conn2接收消息:{conn2.recv()}")
time.sleep(1)
if __name__ == "__main__":
# 创建管道
# Pipe方法返回(conn1, conn2)代表一个管道的两个端.如果conn1带表头, conn2代表尾, conn1发送的消息只会被conn2接收, 同理conn2发送的消息也只会被conn1接收
conn1, conn2 = multiprocessing.Pipe()
# 创建子进程
# Python中,圆括号意味着调用函数. 在没有圆括号的情况下,Python会把函数当做普通对象
process1 = multiprocessing.Process(target=fun1, args=(conn1,))
process2 = multiprocessing.Process(target=fun2, args=(conn2,))
# 启动子进程
process1.start()
process2.start()
管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享
实现核心
实操代码
from multiprocessing import Manager, Process
def func1(name,m_list,m_dict):
m_dict['area'] = '罗布泊'
m_list.append('钱三强')
def func2(name, m_list, m_dict):
m_dict['work'] = '造核弹'
m_list.append('邓稼先')
if __name__ == "__main__":
with Manager() as mgr:
m_list = mgr.list()
m_dict = mgr.dict()
m_list.append("钱学森")
# 两个进程不能直接互相使用对象,需要互相传递
p1 = Process(target=func1, args=('p1', m_list, m_dict))
p1.start()
p1.join() # 等p1进程结束,主进程继续执行
print(m_list)
print(m_dict)
p2 = Process(target=func2, args=('p1', m_list, m_dict))
p2.start()
p2.join() # 等p2进程结束,主进程继续执行
print(m_list)
print(m_dict)
执行结果
太过机密, 不予展示
进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求; 反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行 使用进程池的好处就是可以节约内存空间, 提高资源利用率
进程池相关方法
类/方法 | 功能 | 参数 |
---|---|---|
Pool(processes) | 创建进程池对象 | processes表示进程池中有多少进程 |
pool.apply_async(func,args,kwds) | 异步执行;将事件放入到进程池队列 | func 事件函数 args 以元组形式给func传参kwds 以字典形式给func传参返回值:返回一个代表进程池事件的对象,通过返回值的get方法可以得到事件函数的返回值 |
pool.apply(func,args,kwds) | 同步执行;将事件放入到进程池队列 | func 事件函数 args 以元组形式给func传参kwds 以字典形式给func传参 |
pool.close() | 关闭进程池 | |
pool.join() | 关闭进程池 | |
pool.map(func,iter) | 类似于python的map函数,将要做的事件放入进程池 | func 要执行的函数 iter 迭代对象 |
实现核心
实操代码
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
print(f"方法1输出: 当前进程的ID:{os.getpid()},{name}")
sleep(2)
return name
def func2(args):
print("方法2输出: ", args)
if __name__ == "__main__":
pool = Pool(5)
pool.apply_async(func=func1, args=('进程1',), callback=func2)
pool.apply_async(func=func1, args=('进程2',), callback=func2)
pool.apply_async(func=func1, args=('进程3',), callback=func2)
pool.apply_async(func=func1, args=('进程4',))
pool.apply_async(func=func1, args=('进程5',))
pool.apply_async(func=func1, args=('进程6',))
pool.apply_async(func=func1, args=('进程7',))
pool.close()
pool.join()
执行结果
使用with 方法, 可以进行优雅的进行资源管理. 在这里是可以帮助我们优雅的关闭线程池
关于with方法
with所求值的对象必须有一个enter()方法,一个exit()方法. 紧跟with后面的语句被求值后,返回对象的
__enter__()
方法被调用, 这个方法的返回值将被赋值给as后面的变量。当with后面的代码块全部被执行完之后,将调用前面返回对象的exit()方法
实操代码
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
print(f"方法1输出: 当前进程的ID:{os.getpid()},{name}")
sleep(2)
return name
if __name__ == "__main__":
with Pool(5) as pool:
args = pool.map(func1, ('进程1,', '进程2,', '进程3,', '进程4,', '进程5,', '进程6,', '进程7,', '进程8,'))
for a in args:
print(a)
执行结果
线程是程序执行时的最小单位,也是CPU调度和分派的基本单位. 一个进程可以由很多个线程组成,每个线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度 线程由CPU独立调度执行,在多CPU环境下就允许多个线程同时运行. 同样多线程也可以实现并发操作,每个请求分配一个线程来处理.
Python的标准库提供了两个模块:
_thread
和threading
,_thread
是低级模块,threading
是高级模块, 是对_thread
进行了封装。多数情况下,我们只需要使用threading
这个高级模块. 而线程的创建的方式有两种: 一种是方法包装, 一种是类包装
主要步骤:
实操代码
from threading import Thread
from time import sleep
def func1(name):
"""线程创建方式(方法包装)"""
for i in range(3):
print(f"thread:{name} : {i}")
sleep(1)
if __name__ == "__main__":
"""新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
print("主线程, start")
# 创建线程
t1 = Thread(target=func1, args=("t1",))
t2 = Thread(target=func1, args=("t2",))
# 启动线程
t1.start()
t2.start()
print("主线程, end")
"""
注意: 运行结果可能会出现换行问题,是因为多个线程抢夺控制台输出的IO流
"""
结果展示
主要步骤
实操代码
from threading import Thread
from time import sleep
class MyThread(Thread):
"""
类的方式创建线程(类包装)
"""
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
for i in range(3):
print(f"thread : {self.name} : {i}")
sleep(1)
if __name__ == "__main__":
"""新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
print("主线程, start")
# 创建线程(类创建的方式)
t1 = MyThread("t1")
t2 = MyThread("t2")
# 启动线程
t1.start()
t2.start()
print("主线程, end")
结果展示
从结果可以看出, 启动线程对应的方法是异步的
由上图可知,主线程不会等待子线程结束(异步). 如果需要等待子线程结束后,再结束主线程,可使用join()方法
实操代码
if __name__ == "__main__":
"""新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
print("主线程, start")
# 创建线程(类创建的方式)
t1 = MyThread("t1")
t2 = MyThread("t2")
# 启动线程
t1.start()
t2.start()
# 主线程等t1, t2结束之后, 再往下执行
t1.join()
t2.join()
print("主线程, end")
在行为上还有一种叫守护线程,主要的特征是在它的生命周期中主线程死亡,它也就随之死亡. 在python中,线程通过 setDaemon(True|False)来设置是否为守护线程.
实操代码
from threading import Thread
from time import sleep
class MyThread(Thread):
"""守护线程"""
def __init__(self, name):
Thread.__init__(self)
self.name = name
def run(self):
for i in range(3):
print(f"thread : {self.name} : {i}")
sleep(1)
if __name__ == "__main__":
print("主线程, start")
# 创建线程(类创建的方式)
t1 = MyThread("t1")
# 设置为守护线程
# t1.setDaemon(True) # 该方法3.10后被废弃, 可以直接按照下面的方法设置
t1.daemon = True
# 启动线程
t1.start()
print("主线程, end")
执行结果
从结果可以看出, 主线程不会等待子线程结束(也就是执行循环体的后续循环),
因为子线程被设置成守护线程, 因此主线程执行完毕后子线程就会停止执行
事件Event主要用于唤醒正在阻塞等待状态的线程
注意:
Event() 可以创建一个事件管理标志,该标志(event)默认为False,event对象主要有四种方法可以调用:
方法名 | 说明 |
---|---|
event.wait(timeout=None) | 调用该方法的线程会被阻塞,如果设置了timeout参数,超时后,线程会停止阻塞继续执行; |
event.set() | 将event的标志设置为True,调用wait方法的所有线程将被唤醒 |
event.clear() | 将event的标志设置为False,调用wait方法的所有线程将被阻塞 |
event.is_set() | 判断event的标志是否为True |
实操代码
import time
import threading
def chihuoguo(name):
"""利用事件实现就餐, 吃饭"""
# 等待时间, 进入阻塞等待状态
print(f'{name} 已经到餐厅')
print(f'小伙伴{name} 已经进入就餐状态!')
event.wait()
print(f'{name} 收到通知了.')
print(f'小伙伴{name} 开始吃咯!')
if __name__ == "__main__":
event = threading.Event()
# 创建新线程
threa1 = threading.Thread(target=chihuoguo, args=("玛奇玛",))
threa2 = threading.Thread(target=chihuoguo, args=("电次",))
# 开启线程
threa1.start()
threa2.start()
time.sleep(10)
# 发送时间通知
print('---->>>主线程(波吉塔)通知小伙伴开吃咯!')
event.set()
执行结果
event.wait()
之前的代码段都能够正常执行,
但只有事件标识被设置为 true 时event.set()
, event.wait()
之后的代码才会被执行event.wait()
这段代码注释后, 由于主线程没有变更事件的标识, 即事件标识一直为false
因此程序会一直阻塞直至该标识为true在python中,无论你有多少核,在Cpython解释器中永远都是假象. 无论你是4核,8核,还是16核,同一时间执行的线程只有一个线程. 这个是python开发设计的一个缺陷,所以说python中的线程是“含有水分的线程”.
全局锁, 又称GIL, 全称是: Global Interpreter Lock. 他是用来保证同一时刻只有一个线程在运行.
注意:
处理多线程问题时,多个线程访问同一个对象,并且某些线程还想修改这个对象. 这时候就需要用到“线程同步”. 线程同步其实就是一种等待机制,多个需要同时访问此对象的线程进入这个对象的等待池形成队列,等待前面的线程使用完毕后,下一个线程再使用
实操代码
from threading import Thread
from time import sleep
class Account:
def __init__(self, money, name):
self.money = money
self.name = name
# 模拟提款操作
class Drawing(Thread):
def __init__(self, drawingNum, account):
Thread.__init__(self)
self.drawingNum = drawingNum
self.account = account
self.expenseTotal = 0
def run(self):
if self.account.money - self.drawingNum < 0:
return
sleep(1) # 判断完后阻塞。其他线程开始运行
self.account.money -= self.drawingNum
self.expenseTotal += self.drawingNum
print(f"账户:{self.account.name},余额是:{self.account.money}")
print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")
if __name__ == "__main__":
a1 = Account(100, "timepause")
# 定义取钱线程对象
draw1 = Drawing(80, a1)
# 定义取钱线程对象
draw2 = Drawing(80, a1)
draw1.start()
draw2.start()
执行结果
可以看出, 没有线程同步机制,两个线程同时操作同一个账户对象,可以从只有100元的账户,轻松取出80*2=160元,账户余额变为-60. 但银行一般不会同意用户账户为负的.
互斥锁: 对共享数据进行锁定,保证同一时刻只能有一个线程去操作
注意:
threading
模块中定义了 Lock 变量,这个变量本质上是一个函数,通过调用这个函数可以获取一把互斥锁实现步骤
lock1.acquire()
获得锁之后进行加锁, 然后调用 lock1.release()
释放锁实操代码
from threading import Thread, Lock
from time import sleep
class Account:
def __init__(self, money, name):
self.money = money
self.name = name
# 模拟提款操作
class Drawing(Thread):
def __init__(self, drawingNum, account):
Thread.__init__(self)
self.drawingNum = drawingNum
self.account = account
self.expenseTotal = 0
def run(self):
# 获得锁
lock1.acquire()
if self.account.money - self.drawingNum < 0:
return
sleep(1) # 判断完后阻塞。其他线程开始运行
self.account.money -= self.drawingNum
self.expenseTotal += self.drawingNum
# 释放锁
lock1.release()
print(f"账户:{self.account.name},余额是:{self.account.money}")
print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")
if __name__ == "__main__":
a1 = Account(100, "timepause")
# 创建互斥锁
lock1 = Lock()
# 定义取钱线程对象
draw1 = Drawing(80, a1)
# 定义取钱线程对象
draw2 = Drawing(80, a1)
draw1.start()
draw2.start()
执行结果
可以看到, 加锁之后, 解决了因线程未同步引起的结果紊乱的问题
互斥锁使用后,一个资源同时只有一个线程访问。如果某个资源,我们同时想让N个(指定数值)线程访问? 这时候可以使用信号量. 信号量控制同时访问资源的数量. 信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过
应用场景
实操代码
from threading import Lock, Semaphore, Thread
from time import sleep
def home(name, se):
"""
信号量控制同时访问资源的数量。信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过
"""
# 拿到信号量
se.acquire()
print(f"{name}进入了厕所")
sleep(3)
print(f'******************{name}走出了厕所')
# 归还信号量
se.release()
if __name__ == "__main__":
"""
一个房间一次只允许两个人使用: 若不使用信号量,会造成所有人都进入这个房子; 若只允许一人通过可以用锁Lock()
"""
# 创建信号量的对象,有两个坑位
se = Semaphore(2)
print("欢迎光临本厕所, 目前只有两个坑位(联系商务即可扩增~~~)")
for i in range(7):
p = Thread(target=home, args=(f'tom{i}', se)) # 前面加f的作用是令{}里面的变量引用可以生效
p.start()
执行结果
可以看到, 在使用信号量之后, 每次只能有两个人使用厕所. 在释放信号量之后, 后面的人可以继续使用厕所
在多线程程序中,死锁问题很大一部分是由于一个线程同时获取多个锁造成的
实操代码
from threading import Lock, Thread
from time import sleep
def run1():
# 获取锁
lock1.acquire()
print("幸平创真拿到菜刀")
# lock1.release()
# print("幸平创真释放菜刀")
lock2.acquire()
print("幸平创真拿到锅")
# lock2.release()
# print("幸平创真释放锅")
def run2():
lock2.acquire()
print("卫宫士郎拿到锅")
# lock2.release()
# print("卫宫士郎释放锅")
lock1.acquire()
print("卫宫士郎拿到菜刀")
# lock1.release()
# print("卫宫士郎释放菜刀")
if __name__ == "__main__":
"""
死锁是由于“同步块需要同时持有多个锁造成”的,要解决这个问题思路很简单,就是:同一个代码块,不要同时持有两个对象锁
"""
lock1 = Lock()
lock2 = Lock()
t1 = Thread(target=run1)
t2 = Thread(target=run2)
t1.start()
t2.start()
执行结果
可以看到, 在幸平创真拿到菜刀, 卫宫士郎拿到锅之后, 两人都不愿意释放手中的家伙, 因此无法继续进行做饭, 线程阻塞
放开注释的代码段之后, 由于每个代码块支持有一个对象锁, 因此不会发送死锁
多线程环境下,我们经常需要多个线程的并发和协作. 这个时候,就需要了解一个重要的多线程并发协作模型“生产者/消费者模式
优点:
缓冲区和queue对象
从一个线程向另一个线程发送数据最安全的方式可能就是使用queue 库中的队列了。 创建一个被多个线程共享的 Queue对象,这些线程通过使用 put() 和 get() 操作来向队列中添加或者删除元素。 Queue对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据 Queue对象类似进程中的Manager管理器, 本质都是创建了共享数据, 然后在不同进程/线程之间共享
实现步骤:
queue.put()
将数据放入queue队列, 消费者通过调用queue.get()
获取queue中的数据实操代码
import queue
from threading import Thread
from time import sleep
def producer():
num = 1
while True:
if queue.qsize() < 5:
print(f"生产:{num}号消息")
queue.put(f"消费: {num}号消息")
num += 1
else:
print("消息队列已满, 等待被消费")
sleep(1)
def consumer():
while True:
print(f"获取消息:{queue.get()}")
sleep(1)
if __name__ == "__main__":
queue = queue.Queue()
# 注意这里 target=函数名, 而不是 target=函数名()
t = Thread(target=producer)
t.start()
c = Thread(target=consumer)
c.start()
c2 = Thread(target=consumer)
c2.start()
执行结果
协程也叫作纤程(Fiber),是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理. 我们可以将协程理解为运行在线程上的代码块, 协程挂起并不会引起线程阻塞, 他的作用是提高线程的利用率… 协程之间可以依靠邮箱来进行通信和数据共享, 了避免内存共享数据而带来的线程安全问题. 因为其轻量和高利用率的特点, 即使创建上千个线程也不会对系统造成很大负担, 而线程则恰恰相反. 协程是一种设计思想,不仅仅局限于某一门语言. 在Go, Java, Python 等语言中均有实现
协程的优点
协程的缺点
实操代码
不使用协程时
# 不使用协程执行多个任务
import time
def fun1():
for i in range(3):
print(f'原子弹:第{i}次爆炸啦')
time.sleep(1)
return "fun1执行完毕"
def fun2():
for k in range(3):
print(f'氢弹:第{k}次爆炸了')
time.sleep(1)
return "fun2执行完毕"
def main():
fun1()
fun2()
if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
print(f"耗时{end_time - start_time}") # 不使用协程,耗时6秒
使用使用yield协程,实现任务切换
# 不使用协程执行多个任务
import time
def fun1():
for i in range(3):
print(f'原子弹:第{i}次爆炸啦')
yield # 只要方法包含了yield,就变成一个生成器
time.sleep(1)
return "fun1执行完毕"
def fun2():
g = fun1() # fun1是一个生成器,fun1()就不会直接调用,需要通过next()或for循环调用
print(type(g))
for k in range(3):
print(f'氢弹:第{k}次爆炸了')
next(g) # 继续执行fun1的代码
time.sleep(1)
return "fun2执行完毕"
def main():
fun1()
fun2()
if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
print(f"耗时{end_time - start_time}") # 耗时5.0秒,效率差别不大
使用asyncio异步IO的典型使用方式实现协程
实现步骤:
await asyncio.gather(fun1(), fun2())
asyncio.run(main())
来运行main方法实操代码
# 不使用协程执行多个任务
import asyncio
import time
async def fun1(): # async表示方法是异步的
for i in range(3):
print(f'原子弹:第{i}次爆炸啦')
# await异步执行func1方法
await asyncio.sleep(1)
return "fun1执行完毕"
async def fun2():
for k in range(3):
print(f'氢弹:第{k}次爆炸了')
# await异步执行func2方法
await asyncio.sleep(1)
return "fun2执行完毕"
async def main():
res = await asyncio.gather(fun1(), fun2())
# 返回值为函数的返回值列表,本例为["func1执行完毕", "func2执行完毕"]
print(res)
if __name__ == "__main__":
start_time = time.time()
asyncio.run(main())
end_time = time.time()
print(f"耗时{end_time - start_time}") # 耗时3秒,效率极大提高
实操结果
从这里可以看到, 这里进用了3.02s左右, 只比理论最短用时3s多了0.02s左右, 从这里可以看出使用协程的巨大优势