并行: 并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核的CPU )
并发: 并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。
并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。
并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。
作用:让加锁的部分由并发变成串行,牺牲了执行效率,保证了数据安全。
应用:在程序使用同一份数据时,就会引发数据安全和数据混乱等问题,需要使用锁来维持数据的顺序取用。
下面的小程序模拟抢票软件,对票数进行修改
#查看余票
import json
import time
from multiprocessing import Process
from multiprocessing import Lock
#查看余票
def search(user):
#打开data文件查看余票
with open('data.txt','r',encoding='utf-8') as f:
dic = json.load(f)
print(f'用户{user}查看余票,还剩{dic.get("ticket_num")}')
#抢票功能
def buy(user):
with open('data.txt','r',encoding='utf-8') as f :
dic = json.load(f)
if dic.get("ticket_num")>0:
dic["ticket_num"] -= 1
with open('data.txt','w',encoding='utf-8') as f:
json.dump(dic,f)
print(f'用户{user}抢票成功!')
else:
print(f'用户{user}抢票失败')
#开始抢票
def run(user,mutex):
search(user)
mutex.acquire()
buy(user)
mutex.release()
if __name__ == '__main__':
#调用Lock类实例化一个所对象
mutex = Lock()
# mutex.acquire()#加锁
# mutex.release()#释放锁
for i in range(10):
#并发十个子进程
p = Process(target=run,args=(f'{i}',mutex))
p.start()
用户1查看余票,还剩6
用户1抢票成功!
用户0查看余票,还剩5
用户0抢票成功!
用户2查看余票,还剩4
用户2抢票成功!
用户3查看余票,还剩3
用户3抢票成功!
用户4查看余票,还剩2
用户4抢票成功!
用户6查看余票,还剩1
用户6抢票成功!
用户5查看余票,还剩0
用户5抢票失败
用户7查看余票,还剩0
用户7抢票失败
用户9查看余票,还剩0
用户9抢票失败
用户8查看余票,还剩0
用户8抢票失败
#这里如果不使用互斥锁就会导致票数和抢到的人数不符。
原则:先进先出(堆栈,先进后出)
相当于内存中产生一个队列空间,可以存放多个数据,但是数据是先进去的先被取出来。
Queue是多进程的列队,可以实现多进程间的数据传递。
Queue([maxsize])
:创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法:
q.get( [ block [ ,timeout ] ] )
:返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.get_nowait()
:同q.get(False)
方法。
q.put(item [, block [,timeout ] ] )
:将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
q.qsize()
:返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.empty()
:如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。
q.full()
:如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()
方法)。
q.close()
:关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()
操作上,关闭生产者中的队列不会导致get()
方法返回错误。
q.cancel_join_thread()
:不会再进程退出时自动连接后台线程。这可以防止join_thread()
方法阻塞。
q.join_thread()
:连接队列的后台线程。此方法用于在调用q.close()
方法后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread()
方法可以禁止这种行为。
from multiprocessing import Process,Queue
def test1(q):
data = '数据hello'
q.put(data)#向队列中添加数据,如果列队已经填满则会卡在这里不会往下执行,直到列队空出位置让其把数据放进去
print('进程1开始添加数据到列队中。。')
def test2(q):
data = q.get()#从队列中取出数据,如果列队中已经没有数据给它,也会卡住
#q.get_nowait()#如果获取不到数据就报错
print(f'进程2从队列中获取数据{data}')
#q.empty()#判断列队是否为空,返回bool值
#q.full()#判断列队是否满了,返回bool值
if __name__ == '__main__':
q = Queue(2)#括号内填队列中可以存放元素的个数,不填默认为无限大
p1 = Process(target=test1,args=(q,))
p2 = Process(target=test2,args=(q,))
p1.start()
p2.start()
p2.join()
print('主程序')
生产者:生产数据的
消费者:使用数据的
在程序中,生产者把数据添加到队列中,消费者从队列中获取数据。
from multiprocessing import Queue,Process
import time
def producer(name,food,q):
for i in range(9):
data = food,i
msg = f'用户{name}开始制作{data}'
print(msg)
q.put(data)
time.sleep(0.1)#由于cup执行速度太快,这里加个延时,让两个消费者都能抢到CPU的使用权
def consumer(name,q):
while True:
data = q.get()
if not data:
break
print(f'用户{name}开始吃{data}')
if __name__ == '__main__':
q = Queue()
#创造生产者
p1 = Process(target=producer,args=('tank','油条',q))
p2 = Process(target=producer,args=('小明','馒头',q))
#消费者
c1 = Process(target=consumer,args=('tom',q))
c2 = Process(target=consumer,args=('juery',q))
p1.start()
p2.start()
c1.daemon = True
c2.daemon = True#为消费者添加守护进程,主程序完成就结束掉
c1.start()
c2.start()
p2.join()#这里的目的是当生产者p2等消费者吃完再结束,给主程序加延时也能达到同样的效果
#time.sleep(2)
print('主程序')
线程(thread)是操作系统能够进行运算调度的最小单位。它包含在进程之中,是进程的实际运行单位。一条线程指的是进程中一个单一顺序控制流,一个进程可以并发多个线程,每条线程并发执行不同的任务。同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述和信号处理等等。但是同一进程中的多个线程有各自的调用栈,自己的寄存器环境,自己的进程本地存储。
在多核或多CPU,或支持Hyper-threading的CPU上使用多线程程序设计的好处是显而易见,即提高了程序的执行吞吐率。在单CPU单核的计算机上,使用多线程技术,也可以把进程中负责I/O处理、人机交互而常被阻塞的部分与密集计算的部分分开来执行,编写专门的workhorse线程执行密集计算,从而提高了程序的执行效率,进一步提高系统的并发性。
进程与线程的区别:
进程是系统进行资源分配和调度的基本单位,线程是是操作系统能够进行运算调度的最小单位。线程包含在进程之中,是进程的实际运行单位。
为什么要使用线程?
线程进一步提高了CPU的使用效率。
注意:线程不能实现并行,只能实现并发。
注意:进程是资源分配的最小单位,线程是CPU调度的最小单位。每一个进程中至少有一个线程。
开启一个字处理软件进程,该进程肯定需要办不止一件事情,比如监听键盘输入,处理文字,定时自动将文字保存到硬盘,这三个任务操作的都是同一块数据,因而不能用多进程。只能在一个进程里并发地开启三个线程,如果是单线程,那就只能是,键盘输入时,不能处理文字和自动保存,自动保存时又不能输入和处理文字。
多个线程共享同一个进程的地址空间中的资源,是对一台计算机上多个进程的模拟,有时也称线程为轻量级的进程。
而对一台计算机上多个进程,则共享物理内存、磁盘、打印机等其他物理资源。多线程的运行也多进程的运行类似,是CPU在多个线程之间的快速切换。
不同的进程之间是充满敌意的,彼此是抢占、竞争CPU的关系,如果迅雷会和QQ抢资源。而同一个进程是由一个程序员的程序创建,所以同一进程内的线程是合作关系,一个线程可以访问另外一个线程的内存地址,大家都是共享的,一个线程干死了另外一个线程的内存,那纯属程序员脑子有问题。
类似于进程,每个线程也有自己的堆栈,不同于进程,线程库无法利用时钟中断强制线程让出CPU,可以调用thread_yield运行线程自动放弃CPU,让另外一个线程运行。
线程通常是有益的,但是带来了不小程序设计难度,线程的问题是:
因此,在多线程的代码中,需要更多的心思来设计程序的逻辑、保护程序的数据。
线程的实现可以分为两类:用户级线程(User-Level Thread)和内核线线程(Kernel-Level Thread),后者又称为内核支持的线程或轻量级进程。在多线程操作系统中,各个系统的实现方式并不相同,在有的系统中实现了用户级线程,有的系统中实现了内核级线程。
内核的切换由用户态程序自己控制内核切换,不需要内核干涉,少了进出内核态的消耗,但不能很好的利用多核CPU。
在用户空间模拟操作系统对进程的调度,来调用一个进程中的线程,每个进程中都会有一个运行时系统,用来调度线程。此时当该进程获取CPU时,进程内再调度出一个线程去执行,同一时刻只有一个线程执行。
内核级线程:切换由内核控制,当线程进行切换的时候,由用户态转化为内核态。切换完毕要从内核态返回用户态;可以很好的利用smp,即利用多核CPU。windows线程就是这样的。
优点:当有多个处理机时,一个进程的多个线程可以同时执行。
缺点:由内核进行调度。
用户级与内核级的多路复用,内核同一调度内核线程,每个内核线程对应n个用户线程。
历史:在内核2.6以前的调度实体都是进程,内核并没有真正支持线程。它是能过一个系统调用clone()来实现的,这个调用创建了一份调用进程的拷贝,跟fork()不同的是,这份进程拷贝完全共享了调用进程的地址空间。LinuxThread就是通过这个系统调用来提供线程在内核级的支持的(许多以前的线程实现都完全是在用户态,内核根本不知道线程的存在)。非常不幸的是,这种方法有相当多的地方没有遵循POSIX标准,特别是在信号处理,调度,进程间通信原语等方面。
很显然,为了改进LinuxThread必须得到内核的支持,并且需要重写线程库。为了实现这个需求,开始有两个相互竞争的项目:IBM启动的NGTP(Next Generation POSIX Threads)项目,以及Redhat公司的NPTL。在2003年的年中,IBM放弃了NGTP,也就是大约那时,Redhat发布了最初的NPTL。
NPTL最开始在redhat linux 9里发布,现在从RHEL3起内核2.6起都支持NPTL,并且完全成了GNU C库的一部分。
设计:NPTL使用了跟LinuxThread相同的办法,在内核里面线程仍然被当作是一个进程,并且仍然使用了clone()系统调用(在NPTL库里调用)。但是,NPTL需要内核级的特殊支持来实现,比如需要挂起然后再唤醒线程的线程同步原语futex.
NPTL也是一个1*1的线程库,就是说,当你使用pthread_create()调用创建一个线程后,在内核里就相应创建了一个调度实体,在linux里就是一个新进程,这个方法最大可能的简化了线程的实现。
除NPTL的11模型外还有一个mn模型,通常这种模型的用户线程数会比内核的调度实体多。在这种实现里,线程库本身必须去处理可能存在的调度,这样在线程库内部的上下文切换通常都会相当的快,因为它避免了系统调用转到内核态。然而这种模型增加了线程实现的复杂性,并可能出现诸如优先级反转的问题,此外,用户态的调度如何跟内核态的调度进行协调也是很难让人满意。
Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。
对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
在多线程环境中,Python 虚拟机按以下方式执行:
在调用外部代码(如 C/C++扩展函数)的时候,GIL将会被锁定,直到这个函数结束为止(由于在这期间没有Python的字节码被运行,所以不会做线程切换)编写扩展的程序员可以主动解锁GIL。
如果要创建多个线程可以使用for循环
from threading import Thread
import time
def task():
print('线程开启')
time.sleep(1)
print('线程结束')
if __name__ == '__main__':
t = Thread(target=task)#实例化线程对象,可以在主程序进行,也可以不再主程序进行
t.start()
from threading import Thread
import time
class MyThresd(Thread):
def run(self):
print('线程开启')
time.sleep(1)
print('线程结束')
t = MyThresd()
t.start()
线程的属性和进程的属性有些相似,功能也相似。
Thread实例对象的方法:
getName()
:返回线程名。setName()
:设置线程名。threading模块提供的一些方法:
threading.currentThread()
:返回当前的线程变量。threading.enumerate()
:返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。threading.activeCount()
:返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。from threading import Thread
from threading import current_thread
import time
def task():
print(f'线程开启{current_thread().name}')
time.sleep(3)
print(f'线程结束{current_thread().name}')
if __name__ == '__main__':
for i in range(3):
t = Thread(target=task)
t.start()
print(t.isAlive())
print(t.is_alive())
t1 = Thread(target=task)
t1.daemon = True
t1.start()#这里t1能否正常结束就看t1能否快速的抢到CPU执行自己的代码了,能抢到则可以正常打印“线程结束”,否则就被主程序结束掉了
print('主线程')
线程开启Thread-1
True
True
线程开启Thread-2
True
True
线程开启Thread-3
True
True
线程开启Thread-4
主线程
线程结束Thread-1
线程结束Thread-3
线程结束Thread-2
线程互斥锁和进程互斥锁的作用是一样的,用法也很相似,在需要保护数据的地方加锁就可以了。
from threading import Thread,Lock
import time
mutex = Lock()
n = 100
def task(i):
print(f'线程{i}启动。。')
global n
mutex.acquire()#获取,加锁
temp = n
time.sleep(0.1)
n = temp - 1
print(n)
mutex.release()#释放
#如果不加锁,么个线程获取到的值都是100,所有程序都在执行100-1的操作,加锁之后,每个线程获取到的数据是前一个线程计算完成的结果
if __name__ == '__main__':
t_l = []
for i in range(100):
t = Thread(target=task,args=(i,))
t_l.append(t)
t.start()
for t in t_l:
t.join()
print(n)