前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python升级之路( Lv15 ) 并发编程三剑客: 进程, 线程与协程

Python升级之路( Lv15 ) 并发编程三剑客: 进程, 线程与协程

作者头像
时间静止不是简史
发布2022-09-27 17:41:13
6300
发布2022-09-27 17:41:13
举报
文章被收录于专栏:Java探索之路

Python系列文章目录

第一章 Python 入门

第二章 Python基本概念

第三章 序列

第四章 控制语句

第五章 函数

第六章 面向对象基础

第七章 面向对象深入

第八章 异常机制

第九章 文件操作

第十章 模块

第十一章 GUI图形界面编程

第十二章 pygame游戏开发基础

第十三章 pyinstaller 使用详解

第十四章 并发编程初识

第十五章 并发编程三剑客-进程, 线程与协程

进程, 线程与协程

进程

书写上文, 我们可以知道: 进程是资源(CPU、内存等)分配的基本单位,它是程序执行时的一个实例. 程序运行时系统就会创建一个进程,并为它分配资源,然后把该进程放入进程就绪队列,进程调度器选中它的时候就会为它分配CPU时间,程序开始真正运行. 进程切换需要的资源很最大,效率低。

创建方式

进程创建有两种方式: 方法包装和类包装. 熟悉Java的人可能会发现, 类包装更符合我们原来的书写习惯 创建进程后, 我们使用start() 来启动进程

类包装

主要步骤:

  • 定义一个进程类, 并修改初始化构造, 改为有参构造
  • 创建进程时, 传入初始化方法中添加的参数即可

实操代码

代码语言:javascript
复制
import time
from multiprocessing import Process


class MyProcess(Process):
    """进程的创建方式: 2.类包装"""
    def __init__(self, name):
        Process.__init__(self)
        self.name = name

    def run(self):
        print(f"进程{self.name} 启动")
        time.sleep(3)
        print(f"进程{self.name} 结束")


if __name__ == "__main__":
    print("创建进程")
    p1 = MyProcess("p1")
    p2 = MyProcess("p2")
    p1.start()
    p2.start()

执行结果

方法包装

主要步骤:

  • 在创建进程时: 已默认值参数的方式声明目标函数, 以及传入目标函数的参数(元组的方式)

实操代码

代码语言:javascript
复制
import os
import time
from multiprocessing import Process


def function(name):
    """进程的创建方式: 1.方法包装"""
    print("当前进程ID:", os.getpid())
    print("父进程ID", os.getppid())
    print(f"Process:{name} start")
    time.sleep(3)
    print(f"Process:{name} end")


if __name__ == "__main__":
    print("当前main进程ID: ", os.getppid())
    # 创建进程
    p1 = Process(target=function, args=("p1",))
    p2 = Process(target=function, args=("p2",))
    p1.start()
    p2.start()

执行结果

多说一句

元组中如果只有一个元素, 是需要加逗号的!!! 这是因为括号( )既可以表示tuple,又可以表示数学公式中的小括号, 所以如果没有加逗号,那你里面放什么类型的数据那么类型就会是什么.

进程间通信方式

进程间通信有两种方式: Queue队列和Pipe管道方式

Queue队列

  • 要实现进程间通信,需要使用 multiprocessing 模块中的 Queue 类
  • 进程间通信的方式,就是使用了操作系统给开辟的一个队列空间,各个进程可以把数据放到该队列中,当然也可以从队列中把自己需要的信息取走

实现核心:

  • 这里利用类包装的方式, 并且添加了一个参数mq
  • 主函数声明一个Queue队列, 放入需要通信的消息
  • 在需要调用时, 利用mq,get 获取当前进程所传入的值

实操代码

代码语言:javascript
复制
from multiprocessing import Process, Queue


class MyProcess(Process):
    def __init__(self, name, mq):
        Process.__init__(self)
        self.name = name
        self.mq = mq

    def run(self):
        print("Process {} started".format(self.name))
        print("===Queue", self.mq.get(), "===")
        self.mq.put(self.name)
        print("Process {} end".format(self.name))


if __name__ == "__main__":
    # 创建进程列表
    t_list = []
    mq = Queue()
    mq.put("1")
    mq.put("2")
    mq.put("3")
    # 利用range序列重复创建进程
    for i in range(3):
        t = MyProcess("p{}".format(i), mq)
        t.start()
        t_list.append(t)
    # 等待进程结束
    for t in t_list:
        t.join()
    print(mq.get())
    print(mq.get())
    print(mq.get())

执行结果

Pipe管道

Pipe 直译过来的意思是“管”或“管道”,和实际生活中的管(管道)是非常类似的. Pipe方法返回(conn1, conn2)代表一个管道的两个端.

  • Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个参数是全双工模式,也就是说conn1和conn2均可收发
  • 若duplex为False,conn1只负责接收消息,conn2只负责发送消息. send和recv方法分别是发送和接受消息的方法. 例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息.
  • 如果没有消息可接收,recv方法会一直阻塞. 如果管道已经被关闭,那么recv方法会抛出EOFError

实现核心

  • 主函数声明管道的两端 conn1, conn2 = multiprocessing.Pipe()
  • 以方法包装方式创建进程后, 在对应方法中调用管道的两端调用消息收发的方法 conn1.send/conn1.recv

实操代码

代码语言:javascript
复制
import multiprocessing
import time


def fun1(conn1):
    """
    管道结构
    进程<==>conn1(管道头)==pipe==conn2(管道尾)<==>进程2
    """
    sub_info = "进程向conn1发送消息, 管道另一头conn2 可以接收到消息"
    print(f"进程1--{multiprocessing.current_process().pid}发送数据:{sub_info}")
    time.sleep(1)
    conn1.send(sub_info)        # 调用conn1.send发送消息, 发送的消息会被管道的另一头接收
    print(f"conn1接收消息:{conn1.recv()}")    # conn1.recv接收消息, 如果没有消息可接收, recv方法会一直阻塞. 如果管道已经被关闭,那么recv方法会抛出EOFError
    time.sleep(1)


def fun2(conn2):
    sub_info = "进程向conn2发送消息, 管道另一头conn1 可以接收到消息"
    print(f"进程2--{multiprocessing.current_process().pid}发送数据:{sub_info}")
    time.sleep(1)
    conn2.send(sub_info)
    print(f"conn2接收消息:{conn2.recv()}")
    time.sleep(1)


if __name__ == "__main__":
    # 创建管道
    # Pipe方法返回(conn1, conn2)代表一个管道的两个端.如果conn1带表头, conn2代表尾, conn1发送的消息只会被conn2接收, 同理conn2发送的消息也只会被conn1接收
    conn1, conn2 = multiprocessing.Pipe()
    # 创建子进程
    # Python中,圆括号意味着调用函数. 在没有圆括号的情况下,Python会把函数当做普通对象
    process1 = multiprocessing.Process(target=fun1, args=(conn1,))
    process2 = multiprocessing.Process(target=fun2, args=(conn2,))
    # 启动子进程
    process1.start()
    process2.start()

Manager管理器

管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享

实现核心

  • 创建进程
  • 利用Manager创建字典, 列表等对象, 传入进程
  • 在各进程所对应的方法中修改上面创建的对象

实操代码

代码语言:javascript
复制
from multiprocessing import Manager, Process


def func1(name,m_list,m_dict):
     m_dict['area'] = '罗布泊'
     m_list.append('钱三强')


def func2(name, m_list, m_dict):
    m_dict['work'] = '造核弹'
    m_list.append('邓稼先')


if __name__ == "__main__":
    with Manager() as mgr:
        m_list = mgr.list()
        m_dict = mgr.dict()
        m_list.append("钱学森")
        # 两个进程不能直接互相使用对象,需要互相传递
        p1 = Process(target=func1, args=('p1', m_list, m_dict))
        p1.start()
        p1.join()   # 等p1进程结束,主进程继续执行
        print(m_list)
        print(m_dict)
        p2 = Process(target=func2, args=('p1', m_list, m_dict))
        p2.start()
        p2.join()   # 等p2进程结束,主进程继续执行
        print(m_list)
        print(m_dict)

执行结果

太过机密, 不予展示

进程池(Pool)

进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池未满,则会创建一个新的进程用来执行该请求; 反之,如果池中的进程数已经达到规定最大值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行 使用进程池的好处就是可以节约内存空间, 提高资源利用率

进程池相关方法

类/方法

功能

参数

Pool(processes)

创建进程池对象

processes表示进程池中有多少进程

pool.apply_async(func,args,kwds)

异步执行;将事件放入到进程池队列

func 事件函数 args 以元组形式给func传参kwds 以字典形式给func传参返回值:返回一个代表进程池事件的对象,通过返回值的get方法可以得到事件函数的返回值

pool.apply(func,args,kwds)

同步执行;将事件放入到进程池队列

func 事件函数 args 以元组形式给func传参kwds 以字典形式给func传参

pool.close()

关闭进程池

pool.join()

关闭进程池

pool.map(func,iter)

类似于python的map函数,将要做的事件放入进程池

func 要执行的函数 iter 迭代对象

实现核心

  • 创建和初始化进程池
  • 以方法包装的方式传入相关参数, 并调用相关api

实操代码

代码语言:javascript
复制
from multiprocessing import Pool
import os
from time import sleep


def func1(name):
    print(f"方法1输出: 当前进程的ID:{os.getpid()},{name}")
    sleep(2)
    return name


def func2(args):
    print("方法2输出: ", args)


if __name__ == "__main__":
    pool = Pool(5)
    pool.apply_async(func=func1, args=('进程1',), callback=func2)
    pool.apply_async(func=func1, args=('进程2',), callback=func2)
    pool.apply_async(func=func1, args=('进程3',), callback=func2)
    pool.apply_async(func=func1, args=('进程4',))
    pool.apply_async(func=func1, args=('进程5',))
    pool.apply_async(func=func1, args=('进程6',))
    pool.apply_async(func=func1, args=('进程7',))
    pool.close()
    pool.join()

执行结果

使用with管理进程池

使用with 方法, 可以进行优雅的进行资源管理. 在这里是可以帮助我们优雅的关闭线程池

关于with方法

with所求值的对象必须有一个enter()方法,一个exit()方法. 紧跟with后面的语句被求值后,返回对象的__enter__()方法被调用, 这个方法的返回值将被赋值给as后面的变量。当with后面的代码块全部被执行完之后,将调用前面返回对象的exit()方法

实操代码

代码语言:javascript
复制
from multiprocessing import Pool
import os
from time import sleep


def func1(name):
    print(f"方法1输出: 当前进程的ID:{os.getpid()},{name}")
    sleep(2)
    return name


if __name__ == "__main__":
    with Pool(5) as pool:
        args = pool.map(func1, ('进程1,', '进程2,', '进程3,', '进程4,', '进程5,', '进程6,', '进程7,', '进程8,'))
    for a in args:
        print(a)

执行结果

线程

线程是程序执行时的最小单位,也是CPU调度和分派的基本单位. 一个进程可以由很多个线程组成,每个线程拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度 线程由CPU独立调度执行,在多CPU环境下就允许多个线程同时运行. 同样多线程也可以实现并发操作,每个请求分配一个线程来处理.

创建方式

Python的标准库提供了两个模块: _threadthreading_thread 是低级模块, threading 是高级模块, 是对 _thread 进行了封装。多数情况下,我们只需要使用 threading 这个高级模块. 而线程的创建的方式有两种: 一种是方法包装, 一种是类包装

方法包装

主要步骤:

  • 在创建进程时: 已默认值参数的方式声明目标函数, 以及传入目标函数的参数(元组的方式)

实操代码

代码语言:javascript
复制
from threading import Thread
from time import sleep


def func1(name):
    """线程创建方式(方法包装)"""
    for i in range(3):
        print(f"thread:{name} : {i}")
        sleep(1)


if __name__ == "__main__":
    """新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
    print("主线程, start")
    # 创建线程
    t1 = Thread(target=func1, args=("t1",))
    t2 = Thread(target=func1, args=("t2",))
    # 启动线程
    t1.start()
    t2.start()
    print("主线程, end")
"""
注意: 运行结果可能会出现换行问题,是因为多个线程抢夺控制台输出的IO流
"""

结果展示

类包装

主要步骤

  • 定义一个线程类, 并修改初始化构造, 改为有参构造
  • 创建线程时, 传入初始化方法中添加的参数即可

实操代码

代码语言:javascript
复制
from threading import Thread
from time import sleep


class MyThread(Thread):
    """
    类的方式创建线程(类包装)
    """

    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        for i in range(3):
            print(f"thread : {self.name} : {i}")
            sleep(1)


if __name__ == "__main__":
    """新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
    print("主线程, start")
    # 创建线程(类创建的方式)
    t1 = MyThread("t1")
    t2 = MyThread("t2")
    # 启动线程
    t1.start()
    t2.start()
    print("主线程, end")

结果展示

从结果可以看出, 启动线程对应的方法是异步的

Join

由上图可知,主线程不会等待子线程结束(异步). 如果需要等待子线程结束后,再结束主线程,可使用join()方法

实操代码

代码语言:javascript
复制
if __name__ == "__main__":
    """新建两个线程, 每个线程每隔1秒打印一次消息, 总共打印3次"""
    print("主线程, start")
    # 创建线程(类创建的方式)
    t1 = MyThread("t1")
    t2 = MyThread("t2")
    # 启动线程
    t1.start()
    t2.start()
    # 主线程等t1, t2结束之后, 再往下执行
    t1.join()
    t2.join()
    print("主线程, end")

守护线程

在行为上还有一种叫守护线程,主要的特征是在它的生命周期中主线程死亡,它也就随之死亡. 在python中,线程通过 setDaemon(True|False)来设置是否为守护线程.

实操代码

代码语言:javascript
复制
from threading import Thread
from time import sleep


class MyThread(Thread):
    """守护线程"""
    def __init__(self, name):
        Thread.__init__(self)
        self.name = name

    def run(self):
        for i in range(3):
            print(f"thread : {self.name} : {i}")
            sleep(1)


if __name__ == "__main__":
    print("主线程, start")
    # 创建线程(类创建的方式)
    t1 = MyThread("t1")
    # 设置为守护线程
    # t1.setDaemon(True)  # 该方法3.10后被废弃, 可以直接按照下面的方法设置
    t1.daemon = True
    # 启动线程
    t1.start()
    print("主线程, end")

执行结果

从结果可以看出, 主线程不会等待子线程结束(也就是执行循环体的后续循环),

因为子线程被设置成守护线程, 因此主线程执行完毕后子线程就会停止执行

Event 事件

事件Event主要用于唤醒正在阻塞等待状态的线程

注意:

  • Event 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生. 在初始情况下,event 对象中的信号标志被设置假.
  • 如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真. 一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待个 event 对象的线程
  • 如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行

Event() 可以创建一个事件管理标志,该标志(event)默认为False,event对象主要有四种方法可以调用:

方法名

说明

event.wait(timeout=None)

调用该方法的线程会被阻塞,如果设置了timeout参数,超时后,线程会停止阻塞继续执行;

event.set()

将event的标志设置为True,调用wait方法的所有线程将被唤醒

event.clear()

将event的标志设置为False,调用wait方法的所有线程将被阻塞

event.is_set()

判断event的标志是否为True

实操代码

代码语言:javascript
复制
import time
import threading


def chihuoguo(name):
    """利用事件实现就餐, 吃饭"""
    # 等待时间, 进入阻塞等待状态
    print(f'{name} 已经到餐厅')
    print(f'小伙伴{name} 已经进入就餐状态!')
    event.wait()
    print(f'{name} 收到通知了.')
    print(f'小伙伴{name} 开始吃咯!')


if __name__ == "__main__":
    event = threading.Event()
    # 创建新线程
    threa1 = threading.Thread(target=chihuoguo, args=("玛奇玛",))
    threa2 = threading.Thread(target=chihuoguo, args=("电次",))
    # 开启线程
    threa1.start()
    threa2.start()
    time.sleep(10)
    # 发送时间通知
    print('---->>>主线程(波吉塔)通知小伙伴开吃咯!')
    event.set()

执行结果

  1. 可以看到, 启动两个线程后, 在调用 event.wait() 之前的代码段都能够正常执行, 但只有事件标识被设置为 true 时event.set(), event.wait()之后的代码才会被执行
  1. 可以看到, 我们将 event.wait() 这段代码注释后, 由于主线程没有变更事件的标识, 即事件标识一直为false 因此程序会一直阻塞直至该标识为true

线程锁

在python中,无论你有多少核,在Cpython解释器中永远都是假象. 无论你是4核,8核,还是16核,同一时间执行的线程只有一个线程. 这个是python开发设计的一个缺陷,所以说python中的线程是“含有水分的线程”.

全局锁GIL问题

全局锁, 又称GIL, 全称是: Global Interpreter Lock. 他是用来保证同一时刻只有一个线程在运行.

注意:

  • GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念,同样一段代码可以通过PyPy,Psyco等除CPython外, 不同的Python执行环境来执行,就没有GIL的问题。
  • 因为CPython是大部分环境下默认的Python执行环境. 所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷

线程同步和互斥锁

线程同步

处理多线程问题时,多个线程访问同一个对象,并且某些线程还想修改这个对象. 这时候就需要用到“线程同步”. 线程同步其实就是一种等待机制,多个需要同时访问此对象的线程进入这个对象的等待池形成队列,等待前面的线程使用完毕后,下一个线程再使用

实操代码

  1. 多线程操作同一个对象(未使用线程同步)
代码语言:javascript
复制
from threading import Thread
from time import sleep


class Account:
    def __init__(self, money, name):
        self.money = money
        self.name = name


# 模拟提款操作
class Drawing(Thread):
    def __init__(self, drawingNum, account):
        Thread.__init__(self)
        self.drawingNum = drawingNum
        self.account = account
        self.expenseTotal = 0

    def run(self):
        if self.account.money - self.drawingNum < 0:
            return
        sleep(1)  # 判断完后阻塞。其他线程开始运行
        self.account.money -= self.drawingNum
        self.expenseTotal += self.drawingNum
        print(f"账户:{self.account.name},余额是:{self.account.money}")
        print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")


if __name__ == "__main__":
    a1 = Account(100, "timepause")
    # 定义取钱线程对象
    draw1 = Drawing(80, a1)
    # 定义取钱线程对象
    draw2 = Drawing(80, a1)
    draw1.start()
    draw2.start()

执行结果

可以看出, 没有线程同步机制,两个线程同时操作同一个账户对象,可以从只有100元的账户,轻松取出80*2=160元,账户余额变为-60. 但银行一般不会同意用户账户为负的.

  1. 我们可以通过“锁机制”来实现线程同步问题,锁机制有如下几个要点:
  2. 必须使用同一个锁对象
  3. 互斥锁的作用就是保证同一时刻只能有一个线程去操作共享数据,保证共享数据不会出现错误问题
  4. 使用互斥锁的好处确保某段关键代码只能由一个线程从头到尾完整地去执行
  5. 使用互斥锁会影响代码的执行效率
  6. 同时持有多把锁,容易出现死锁的情况
互斥锁

互斥锁: 对共享数据进行锁定,保证同一时刻只能有一个线程去操作

注意:

  • 互斥锁是多个线程一起去抢,抢到锁的线程先执行,没有抢到锁的线程需要等待,等互斥锁使用完释放后,其它等待的线程再去抢这个锁
  • threading 模块中定义了 Lock 变量,这个变量本质上是一个函数,通过调用这个函数可以获取一把互斥锁

实现步骤

  • 创建互斥锁
  • 在需要加锁的代码段上面使用 lock1.acquire()获得锁之后进行加锁, 然后调用 lock1.release() 释放锁

实操代码

代码语言:javascript
复制
from threading import Thread, Lock
from time import sleep


class Account:
    def __init__(self, money, name):
        self.money = money
        self.name = name


# 模拟提款操作
class Drawing(Thread):
    def __init__(self, drawingNum, account):
        Thread.__init__(self)
        self.drawingNum = drawingNum
        self.account = account
        self.expenseTotal = 0

    def run(self):
        # 获得锁
        lock1.acquire()
        if self.account.money - self.drawingNum < 0:
            return
        sleep(1)  # 判断完后阻塞。其他线程开始运行
        self.account.money -= self.drawingNum
        self.expenseTotal += self.drawingNum
        # 释放锁
        lock1.release()
        print(f"账户:{self.account.name},余额是:{self.account.money}")
        print(f"账户:{self.account.name},总共取了:{self.expenseTotal}")


if __name__ == "__main__":
    a1 = Account(100, "timepause")
    # 创建互斥锁
    lock1 = Lock()
    # 定义取钱线程对象
    draw1 = Drawing(80, a1)
    # 定义取钱线程对象
    draw2 = Drawing(80, a1)
    draw1.start()
    draw2.start()

执行结果

可以看到, 加锁之后, 解决了因线程未同步引起的结果紊乱的问题

Semaphore信号量

互斥锁使用后,一个资源同时只有一个线程访问。如果某个资源,我们同时想让N个(指定数值)线程访问? 这时候可以使用信号量. 信号量控制同时访问资源的数量. 信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过

应用场景

  • 在读写文件的时候,一般只能只有一个线程在写,而读可以有多个线程同时进行,如果需要限制同时读文件的线程个数,这时候就可以用到信号量了(如果用互斥锁,就是限制同一时刻只能有一个线程读取文件).
  • 在做爬虫抓取数据时
  • 生活中, 在上厕所时, 需要限制同时使用的人数, 因为厕所的坑位是固定的, 超过坑位数的人去使用厕所需要排队.

实操代码

代码语言:javascript
复制
from threading import Lock, Semaphore, Thread
from time import sleep


def home(name, se):
    """
    信号量控制同时访问资源的数量。信号量和锁相似,锁同一时间只允许一个对象(进程)通过,信号量同一时间允许多个对象(进程)通过
    """
    # 拿到信号量
    se.acquire()
    print(f"{name}进入了厕所")
    sleep(3)
    print(f'******************{name}走出了厕所')
    # 归还信号量
    se.release()


if __name__ == "__main__":
    """
    一个房间一次只允许两个人使用: 若不使用信号量,会造成所有人都进入这个房子; 若只允许一人通过可以用锁Lock()
    """
    # 创建信号量的对象,有两个坑位
    se = Semaphore(2)
    print("欢迎光临本厕所, 目前只有两个坑位(联系商务即可扩增~~~)")
    for i in range(7):
        p = Thread(target=home, args=(f'tom{i}', se))  # 前面加f的作用是令{}里面的变量引用可以生效
        p.start()

执行结果

可以看到, 在使用信号量之后, 每次只能有两个人使用厕所. 在释放信号量之后, 后面的人可以继续使用厕所

死锁

在多线程程序中,死锁问题很大一部分是由于一个线程同时获取多个锁造成的

实操代码

代码语言:javascript
复制
from threading import Lock, Thread
from time import sleep


def run1():
    # 获取锁
    lock1.acquire()
    print("幸平创真拿到菜刀")
    # lock1.release()
    # print("幸平创真释放菜刀")
    lock2.acquire()
    print("幸平创真拿到锅")
    # lock2.release()
    # print("幸平创真释放锅")



def run2():
    lock2.acquire()
    print("卫宫士郎拿到锅")
    # lock2.release()
    # print("卫宫士郎释放锅")
    lock1.acquire()
    print("卫宫士郎拿到菜刀")
    # lock1.release()
    # print("卫宫士郎释放菜刀")



if __name__ == "__main__":
    """
    死锁是由于“同步块需要同时持有多个锁造成”的,要解决这个问题思路很简单,就是:同一个代码块,不要同时持有两个对象锁
    """
    lock1 = Lock()
    lock2 = Lock()

    t1 = Thread(target=run1)
    t2 = Thread(target=run2)

    t1.start()
    t2.start()

执行结果

可以看到, 在幸平创真拿到菜刀, 卫宫士郎拿到锅之后, 两人都不愿意释放手中的家伙, 因此无法继续进行做饭, 线程阻塞

放开注释的代码段之后, 由于每个代码块支持有一个对象锁, 因此不会发送死锁

生产者和消费者模式

多线程环境下,我们经常需要多个线程的并发和协作. 这个时候,就需要了解一个重要的多线程并发协作模型“生产者/消费者模式

  • 生产者 生产者指的是负责生产数据的模块(这里模块可能是:方法、对象、线程、进程)
  • 消费者 消费者指的是负责处理数据的模块(这里模块可能是:方法、对象、线程、进程)
  • 缓冲区 存放数据的模块, 生产者将生产好的数据放入“缓冲区”,消费者从“缓冲区”拿要处理的数据

优点:

  • 实现线程的并发协作
  • 异步, 解耦了生产者和消费者
  • 解决忙闲不均,提高效率

缓冲区和queue对象

从一个线程向另一个线程发送数据最安全的方式可能就是使用queue 库中的队列了。 创建一个被多个线程共享的 Queue对象,这些线程通过使用 put() 和 get() 操作来向队列中添加或者删除元素。 Queue对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据 Queue对象类似进程中的Manager管理器, 本质都是创建了共享数据, 然后在不同进程/线程之间共享

实现步骤:

  • 创建queue队列, 创建生产者消费者线程
  • 生产者通过 queue.put()将数据放入queue队列, 消费者通过调用queue.get() 获取queue中的数据

实操代码

代码语言:javascript
复制
import queue
from threading import Thread
from time import sleep


def producer():
    num = 1
    while True:
        if queue.qsize() < 5:
            print(f"生产:{num}号消息")
            queue.put(f"消费: {num}号消息")
            num += 1
        else:
            print("消息队列已满, 等待被消费")
        sleep(1)


def consumer():
    while True:
        print(f"获取消息:{queue.get()}")
        sleep(1)


if __name__ == "__main__":
    queue = queue.Queue()
    # 注意这里 target=函数名, 而不是 target=函数名()
    t = Thread(target=producer)
    t.start()
    c = Thread(target=consumer)
    c.start()
    c2 = Thread(target=consumer)
    c2.start()

执行结果

协程

协程也叫作纤程(Fiber),是一种在线程中,比线程更加轻量级的存在,由程序员自己写程序来管理. 我们可以将协程理解为运行在线程上的代码块, 协程挂起并不会引起线程阻塞, 他的作用是提高线程的利用率… 协程之间可以依靠邮箱来进行通信和数据共享, 了避免内存共享数据而带来的线程安全问题. 因为其轻量和高利用率的特点, 即使创建上千个线程也不会对系统造成很大负担, 而线程则恰恰相反. 协程是一种设计思想,不仅仅局限于某一门语言. 在Go, Java, Python 等语言中均有实现

协程的优点

  • 由于自身带有上下文和栈,无需线程上下文切换的开销,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级;
  • 无需原子操作的锁定及同步的开销;
  • 方便切换控制流,简化编程模型
  • 单线程内就可以实现并发的效果,最大限度地利用cpu,且可扩展性高,成本低(注:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理)

协程的缺点

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上

协程与线程的比较

  • 在单线程同步模型中,任务按照顺序执行 如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行
  • 多线程模型中,多个任务分别在独立的线程中执行 这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行. 这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行
  • 协程版本的程序中,多个任务交错执行,但仍然在一个单独的线程控制中 当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件. 事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。

asyncio实现协程(重点)

  • 正常的函数执行时是不会中断的,所以你要写一个能够中断的函数,就需要加 async
  • async 用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他 异步函数,等到挂起条件(假设挂起条件是 sleep(5) )消失后,也就是5秒到了再回来执行
  • await 用来用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂 起,去执行其他的异步程序。
  • asyncio 是python3.5之后的协程模块,是python实现并发重要的包,这个包使用事件循环驱动实 现并发
  • asyncio协程是写爬虫比较好的方式. 比多线程和多进程都好.开辟新的线程和进程是非常耗时的

实操代码

不使用协程时

代码语言:javascript
复制
# 不使用协程执行多个任务
import time


def fun1():
    for i in range(3):
        print(f'原子弹:第{i}次爆炸啦')
        time.sleep(1)
    return "fun1执行完毕"


def fun2():
    for k in range(3):
        print(f'氢弹:第{k}次爆炸了')
        time.sleep(1)
    return "fun2执行完毕"


def main():
    fun1()
    fun2()


if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    print(f"耗时{end_time - start_time}")  # 不使用协程,耗时6秒

使用使用yield协程,实现任务切换

代码语言:javascript
复制
# 不使用协程执行多个任务
import time


def fun1():
    for i in range(3):
        print(f'原子弹:第{i}次爆炸啦')
        yield   # 只要方法包含了yield,就变成一个生成器
        time.sleep(1)
    return "fun1执行完毕"


def fun2():
    g = fun1()   # fun1是一个生成器,fun1()就不会直接调用,需要通过next()或for循环调用
    print(type(g))
    for k in range(3):
        print(f'氢弹:第{k}次爆炸了')
        next(g)   # 继续执行fun1的代码
        time.sleep(1)
    return "fun2执行完毕"


def main():
    fun1()
    fun2()


if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    print(f"耗时{end_time - start_time}")  # 耗时5.0秒,效率差别不大

使用asyncio异步IO的典型使用方式实现协程

实现步骤:

  • 创建两个异步方法fun1, fun2
  • 创建一个main方法来管理上面两个异步方法 await asyncio.gather(fun1(), fun2())
  • 主函数中通过 asyncio.run(main()) 来运行main方法

实操代码

代码语言:javascript
复制
# 不使用协程执行多个任务
import asyncio
import time


async def fun1():  # async表示方法是异步的
    for i in range(3):
        print(f'原子弹:第{i}次爆炸啦')
        # await异步执行func1方法
        await asyncio.sleep(1)
    return "fun1执行完毕"


async def fun2():
    for k in range(3):
        print(f'氢弹:第{k}次爆炸了')
        # await异步执行func2方法
        await asyncio.sleep(1)
    return "fun2执行完毕"


async def main():
    res = await asyncio.gather(fun1(), fun2())
    # 返回值为函数的返回值列表,本例为["func1执行完毕", "func2执行完毕"]
    print(res)

if __name__ == "__main__":
    start_time = time.time()
    asyncio.run(main())
    end_time = time.time()
    print(f"耗时{end_time - start_time}")  # 耗时3秒,效率极大提高

实操结果

从这里可以看到, 这里进用了3.02s左右, 只比理论最短用时3s多了0.02s左右, 从这里可以看出使用协程的巨大优势


本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-08-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Python系列文章目录
    • 进程, 线程与协程
    • 进程
      • 创建方式
        • 类包装
        • 方法包装
      • 进程间通信方式
        • Queue队列
        • Pipe管道
      • Manager管理器
        • 进程池(Pool)
          • 使用with管理进程池
      • 线程
        • 创建方式
          • 方法包装
          • 类包装
        • Join
          • 守护线程
            • Event 事件
              • 线程锁
                • 全局锁GIL问题
                • 线程同步和互斥锁
                • Semaphore信号量
                • 死锁
              • 生产者和消费者模式
              • 协程
                • 协程与线程的比较
                  • asyncio实现协程(重点)
                  相关产品与服务
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档