python3--线程,锁,同步锁,递归锁,信号量,事件,条件和定时器,队列,线程池

线程

什么是线程?

线程是cpu调度的最小单位

进程是资源分配的最小单位

进程和线程是什么关系?

  线程是在进程中的 一个执行单位

  多进程 本质上开启的这个进程里就有一个线程

  多线程 单纯的在当前进程中开启了多个线程

线程和进程的区别:

  线程的开启 销毁 任务切换的时间开销小

  在同一个进程中数据共享

  能实现并发,但不能脱离进程

  进程负责管理分配资源 线程负责执行代码

GIL锁 -- 全局解释器锁

同一时刻只能有一个线程访问CPU -- 线程锁

Cpython会受到GIL影响

python程序效率下降

高计算型 -- 多线程会导致程序的效率下降

高IO型的 -- 可以使用多线程

守护线程

守护线程和守护进程的区别

守护进程是等待主进程代码结束之后就结束

守护线程是等待主线程都结束之后才结束

主线程等待其他线程结束,才结束

Thread类的其他方法

Thread实例对象的方法
  isAlive(): 返回线程是否活动的。
  getName(): 返回线程名。
  setName(): 设置线程名。

threading模块提供的一些方法:
  threading.currentThread(): 返回当前的线程变量。
  threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止
  后的线程。
  threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

代码示例

from threading import Thread
import time
def func():
    # 需要延迟一下,不然会先执行print,那么结果会不一样
    time.sleep(0.1)  
    print(123)

t = Thread(target=func)
t.start()
print(t.is_alive())  # 线程是否运行

执行结果

True

123

示例2

from threading import Thread, currentThread, enumerate
import time
def func():
    print('-->', currentThread())   # 打印线程名,线程号
    # 需要延迟一下,不然会先执行print,那么结果会不一样
    time.sleep(0.1)
    print(123)

t = Thread(target=func)
t.start()
print(t.is_alive())  # 线程是否运行
print(t.getName())  # 打印线程名
t.setName('t1')  # 设置线程名
print(t.getName())  # 打印线程名
print(currentThread())  # 打印线程名,线程号
print(enumerate())  # 打印线程列表

执行结果

-->

-->

True

Thread-1

t1

[

123

同步锁

既然有了GIL锁,为什么还有其他锁?

示例代码:

import time
from threading import Thread
def func():
    global n
    temp = n
    time.sleep(0.1)
    n = temp - 1

n = 100
t_lst = []
for i in range(100):
    t = Thread(target=func)
    t.start()
    t_lst.append(t)
for t in t_lst:
    t.join()
print(n)

执行结果

99

GIL —— 全局解释器锁

锁线程 :在计算的时候 同一时刻只能有一个线程访问CPU

线程锁限制了你对CPU的使用,但是不影响web类或者爬虫类代码的效率

我们可以通过启动多进程的形式来弥补这个问题

为什么是99呢?

每个线程,从进程中取值100,那么CPU计算100-1,结果是99

这个99又赋值给n,进程变量就是99,所以每次都是赋值操作,赋值了100次,最终结果99,这样还是出现数据不安全的情况

如何解决?加锁

错误 : 示例代码如下

import time
from threading import Thread,Lock
def func(lock):
    global n
    lock.acquire()  # 加锁
    temp = n
    time.sleep(0.1)
    n = temp - 1
    lock.release()  # 释放锁(解锁)

n = 100
t_lst = []
for i in range(100):  # 写在for循环里面,等同于创建了100把锁
    lock = Lock()  # 创建对象锁
    t = Thread(target=func,args=(lock,))
    t.start()
    t_lst.append(t)
for t in t_lst:
    t.join()
print(n)

执行结果

99

正确代码示例:

import time
from threading import Thread,Lock
def func(lock):
    global n
    lock.acquire()
    temp = n
    time.sleep(0.1)
    n = temp - 1
    lock.release()

n = 100
t_lst = []
lock = Lock()  # 创建一个对象锁
for i in range(100):
    t = Thread(target=func,args=(lock,))
    t.start()
    t_lst.append(t)
for t in t_lst:
    t.join()
print(n)

执行结果

0

正确写法示例2

import time
from threading import Thread
def func():
    global n
    n = n -1

n = 100
t_lst = []
for i in range(100):
    t = Thread(target=func)
    t.start()
    t_lst.append(t)
for t in t_lst:
    t.join()
print(n)

执行结果,如果把计算和赋值两个步骤拆开,就会出现数据不安全的情况

0

总结:线程也需要锁,针对上面这张情况,需要加锁,这种锁,叫做同步锁

互斥锁

在同一个线程中,能够被一个锁的多个acquire阻塞住了,这种锁就叫互斥锁

from threading import Lock
lock = Lock()  # 在同一个线程中
               # 能够被一个锁的多个acquire阻塞住了
               # 这种锁就叫互斥锁
lock.acquire()
lock.acquire()

执行结果

阻塞住了......

死锁

进程也有死锁与递归锁

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

示例代码:

from threading import Lock
import time
mutexA = Lock()
mutexA.acquire()
mutexA.acquire()
time.sleep(0.1)
print(123)
mutexA.acquire()
mutexA.acquire()

执行结果

阻塞住......

解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

示例代码:

from threading import RLock
import time
mutexA = RLock()
mutexA.acquire()
mutexA.acquire()
time.sleep(0.1)
print(123)
mutexA.acquire()
mutexA.acquire()

执行结果

123

递归锁 -- RLock

典型问题:科学家吃面

形成死锁原因--即需要同时满足两个必要因素,想要吃面--需要面和叉,造成资源的互相抢占问题

死锁代码示例:

import time
from threading import Thread,Lock
def eat1(noodle_lock,fork_lock,name):
    noodle_lock.acquire()
    print('%s抢到了面'%name)
    fork_lock.acquire()
    print('%s抢到了叉子'%name)
    print('%s正在吃面'%name)
    fork_lock.release()
    print('%s归还了叉子' % name)
    noodle_lock.release()
    print('%s归还了面' % name)

def eat2(noodle_lock,fork_lock,name):
    fork_lock.acquire()
    print('%s抢到了叉子' % name)
    time.sleep(0.5)
    noodle_lock.acquire()
    print('%s抢到了面'%name)
    print('%s正在吃面'%name)
    noodle_lock.release()
    print('%s归还了面' % name)
    fork_lock.release()
    print('%s归还了叉子' % name)

if __name__ == '__main__':
    noodle_lock = Lock()
    fork_lock = Lock()
    Thread(target=eat1,args=(noodle_lock,fork_lock,'zhangsan')).start()
    Thread(target=eat2,args=(noodle_lock,fork_lock,'lisi')).start()
    Thread(target=eat1,args=(noodle_lock,fork_lock,'wangwu')).start()
    Thread(target=eat2,args=(noodle_lock,fork_lock,'zhuliu')).start()

执行结果

只有一个锁,不会出现死锁,那么有多个锁的情况下,就会出现死锁

如何解决这个问题呢?使用递归锁

同一个线程中对同一个锁多次acquire不会产生阻塞

递归锁 -- 错误示例

from threading import Thread,RLock
def func(rlock,flag):
    rlock.acquire()
    print(flag*10)
    rlock.acquire()
    print(flag * 10)
    rlock.acquire()
    print(flag * 10)
    rlock.acquire()
    print(flag * 10)

rlock = RLock()
Thread(target=func,args=(rlock,'*')).start()
Thread(target=func,args=(rlock,'-')).start()

执行结果

出现死锁的情况

两个线程(进程),一个拿着外层钥匙,一个拿着内层钥匙,谁也不给对方,谁都进不去,就出现了死锁

注释:

第一个线程过来,拿走了一串钥匙

每次acquire,就进入一个房间,最后在房间的最里面,因为没有release,所以出不来,阻塞在里面

如图

它每走出一个房间,需要release一次,将钥匙放到最外面门上,让下个进程进去,所以有几次acquire,就有几次release,跟函数的递归类似,怎么解决上面卡住的问题?加上4次release就可以了,如下

from threading import Thread,RLock
def func(rlock,flag):
    rlock.acquire()
    print(flag*10)
    rlock.acquire()
    print(flag * 10)
    rlock.acquire()
    print(flag * 10)
    rlock.acquire()
    print(flag * 10)
    rlock.release()
    rlock.release()
    rlock.release()
    rlock.release()

rlock = RLock()
Thread(target=func,args=(rlock,'*')).start()
Thread(target=func,args=(rlock,'-')).start()

执行结果

**********

**********

**********

**********

----------

----------

----------

----------

使用递归锁,解决科学家吃面的问题,代码如下

import time
from threading import Thread,RLock
def eat1(noodle_lock,fork_lock,name):
    noodle_lock.acquire()
    print('%s抢到了面'%name)
    fork_lock.acquire()
    print('%s抢到了叉子'%name)
    print('%s正在吃面'%name)
    fork_lock.release()
    print('%s归还了叉子' % name)
    noodle_lock.release()
    print('%s归还了面' % name)

def eat2(noodle_lock,fork_lock,name):
    fork_lock.acquire()
    print('%s抢到了叉子' % name)
    time.sleep(0.5)
    noodle_lock.acquire()
    print('%s抢到了面'%name)
    print('%s正在吃面'%name)
    noodle_lock.release()
    print('%s归还了面' % name)
    fork_lock.release()
    print('%s归还了叉子' % name)

if __name__ == '__main__':
    fork_lock = noodle_lock = RLock()  # 表示同一串钥匙
    Thread(target=eat1,args=(noodle_lock,fork_lock,'zhangsan')).start()
    Thread(target=eat2,args=(noodle_lock,fork_lock,'lisi')).start()
    Thread(target=eat1,args=(noodle_lock,fork_lock,'wangwu')).start()
    Thread(target=eat2,args=(noodle_lock,fork_lock,'zhuliu')).start()

执行结果

什么情况下,需要用到递归锁呢?

有超过一个资源需要锁的时候 -- 递归锁

信号量

同进程一样

Semaphore管理一个内置的计数器,

每当调用acquire()时内置计数器-1;

调用release() 时内置计数器+1;

计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

实例:(同时只有2个线程可以获得semaphore,即可以限制最大连接数为2):

from threading import Thread,Semaphore,current_thread
import time
def func():
    sm.acquire()
    print('{} get sm'.format(current_thread().getName()))
    time.sleep(3)
    sm.release()

if __name__ == '__main__':
    sm = Semaphore(2)
    for i in range(5):
        t = Thread(target=func)
        t.start()

执行结果

Thread-1 get sm

Thread-2 get sm

Thread-3 get sm

Thread-4 get sm

Thread-5 get sm

与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

事件

同进程一样

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。

event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。

例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作

实例:

from threading import Thread,Event,current_thread
import time
import random

def conn_mysql(event):
    count = 1
    while count < 4:
        print('第{}次尝试连接'.format(current_thread().getName(),count))
        event.wait(0.5)
        # 如果不传参数会一直等到事件为True为止
        # 如果传参数,传一个时间参数,等到多少秒,类似time.sleep()
        count += 1
        if event.is_set():
            print('连接成功'.format(current_thread().getName()))
            break
    else:
        print('连接失败')

def check_mysql(event):
    print('{}正在检查mysql'.format(current_thread().getName()))
    time.sleep(random.randint(1,2))
    event.set()


if __name__ == '__main__':
    event = Event()
    conn1 = Thread(target=conn_mysql,args=(event,))
    check = Thread(target=check_mysql,args=(event,))
    conn1.start()
    check.start()

执行结果

Thread-1第1次尝试连接

Thread-2正在检查mysql

Thread-1第2次尝试连接

Thread-1第3次尝试连接

Thread-1连接成功

条件

使得线程等待,只有满足某条件时,才释放n个线程

详细说明

Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire
和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则
wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重
新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

代码说明:

import threading
def run(n):
    con.acquire()
    con.wait()
    print("run the thread: {}".format(n))
    con.release()


if __name__ == '__main__':
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run,args=(i,))
        t.start()
    while True:
        inp = input('>>>').strip()
        if inp == 'q':
            break
        con.acquire()
        con.notify(int(inp))
        con.release()
        print('******')

执行结果

定时器

定时器,指定n秒后执行某个操作

from threading import Timer
def hello():
    print('hello, world')

t = Timer(2, hello)  # 2秒后执行hello
t.start()

执行结果

hello, world

线程队列

queue队列 :使用import queue,用法与进程Queue一样

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

class queue.Queue(maxsize=0) #先进先出

import queue
q = queue.Queue()  # 先进先出,先打印1
q.put('1')
q.put('2')
q.put('3')
print(q.get())
print(q.get())
print(q.get())

执行结果

1

2

3

class queue.LifoQueue(maxsize=0) #后进先出

import queue
q = queue.LifoQueue()  # 后进先出,先打印3
q.put('1')
q.put('2')
q.put('3')
print(q.get())
print(q.get())
print(q.get())

执行结果

3

2

1

class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

优先级队列

import queue
q = queue.PriorityQueue()  # 优先级队列,数字越小优先级越高
                           # 字符串,则按照asicc码顺序排列
q.put((1,'a'))
q.put((10,'b'))
q.put((5,'c'))
print(q.get())
print(q.get())
print(q.get())

执行结果

(1, 'a')

(5, 'c')

(10, 'b')

Python标准模块--concurrent.futures

线程池

能够在多线程的基础上进一步节省内存和时间开销

#1 介绍
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务

#map(func, *iterables, timeout=None, chunksize=1) 
取代for循环submit的操作

#shutdown(wait=True) 
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

#result(timeout=None)
取得结果

#add_done_callback(fn)
回调函数

示例代码

import time
from concurrent.futures import ThreadPoolExecutor
def func(i):
    print(i*'*')
    time.sleep(1)

thread_pool =  ThreadPoolExecutor(5)  # ThreadPoolExecutor(5)设置线程数为5
for i in range(10):
    thread_pool.submit(func, i)  # submit 相当于apply_async 异步
thread_pool.shutdown()  # shutdown 相当于 close+join
print('1111')  # 如果想最后打印 1111,需要执行shutdown()

执行结果

*

**

***

****

*****

******

*******

********

*********

1111

上面代码 return一个值 异步代码示例

import time
from concurrent.futures import ThreadPoolExecutor
def func(i):
    print(i*'*')
    time.sleep(1)
    return i ** 2

thread_pool = ThreadPoolExecutor(5)  # 设置线程池数量为5
ret_lst = []
for i in range(10):
    ret = thread_pool.submit(func, i)  # submit 相当于apply_async 异步
    ret_lst.append(ret)
thread_pool.shutdown()  # shutdown 相当于 close + join
for ret in ret_lst:
    print(ret.result())  # result 返回值
print('111111')

执行结果

map的用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
    print('{} is runing'.format(os.getpid()))
    time.sleep(random.randint(1, 3))
    return n**2

if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=3)
    executor.map(task, range(1,12))  # map取代了for + submit

执行结果

回调函数

import time
from concurrent.futures import ThreadPoolExecutor
def func(i):
    print(i*'*')
    time.sleep(1)
    return i**2

def callb(arg):
    print(arg.result()*'-')

if __name__ == '__main__':
    thread_pool = ThreadPoolExecutor(5)
    for i in range(10):
        # 相当于apply_async  add_done_callback 回调函数
        thread_pool.submit(func,i).add_done_callback(callb)  #将callb函数的返回值作为参数
    thread_pool.shutdown()  # 相当于 close+join
    print('wahaha')

执行结果

当内存不需要共享,且高计算的时候 用进程

当内存需要共享,且高IO的时候 用线程

当并发很大的时候

    设计模式

    多进程 : 多个任务 —— 进程池 :cpu个数、cpu个数+1

    多线程 :多个任务  —— 线程池 :cpu个数*5

    4核  : 4个、5个进程 —— 20条线程/进程   : 80-100个任务

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏圆方圆学院精选

【刘文彬】 Debug EOS:nodeos + mongo_db_plugin

原文链接:醒者呆的博客园,https://www.cnblogs.com/Evsward/p/storage.html

1342
来自专栏于晓飞的专栏

Java 并发 学习笔记

最近重新复习了一边并发的知识,发现自己之前对于并发的了解只是皮毛。这里总结以下Java并发需要掌握的点。

1122
来自专栏UDNZ

记一次 .NET Framework 不兼容 HTTP COOKIE 协议标准的问题跟踪

我们在后端系统实现了 HTTP 请求的代理类,用于请求其他第三方系统。

3158
来自专栏吴生的专栏

谁说深入浅出虚拟机难?现在我让他通俗易懂(JVM)

1:什么是JVM 大家可以想想,JVM 是什么?JVM是用来干什么的?在这里我列出了三个概念,第一个是JVM,第二个是JDK,第三个是JRE。相信大家对这三个不...

3906
来自专栏CSDN技术头条

一图读懂JVM架构解析

本文阐述了JVM的构成和组件,配图清晰易懂,是学习Java开发者的入门必读文章。 每个Java开发人员都知道字节码经由JRE(Java运行时环境)执行。但他们或...

2208
来自专栏PPV课数据科学社区

python多线程编程(1): python对多线程的支持

前面介绍过多线程的基本概念,理解了这些基本概念,掌握python多线程编程就比较容易了。 在开始之前,首先要了解一下python对多线程的支持。 虚拟机层面 P...

37215
来自专栏林冠宏的技术文章

java 线程 Thread 使用介绍,包含wait(),notifyAll() 等函数使用介绍

(原创,转载请说明出处!谢谢--https://cloud.tencent.com/developer/user/1148436/activities)  此文...

1967
来自专栏逸鹏说道

Python3 与 C# 并发编程之~ 进程实战篇

之前说过 Queue:在 Process之间使用没问题,用到 Pool,就使用 Manager().xxx, Value和 Array,就不太一样了:

1684
来自专栏coder修行路

Beego 框架学习(一)

 Beego官网本身已经整理的非常详细了,但是作为一个学习者,我还是决定自己好好整理一下,这样在后面使用的时候自己对每部分才能非常熟悉,即使忘记了,也可以迅速定...

5448
来自专栏小灰灰

Java可以如何实现文件变动的监听

应用中使用logback作为日志输出组件的话,大部分会去配置 logback.xml 这个文件,而且生产环境下,直接去修改logback.xml文件中的日志级别...

2498

扫码关注云+社区

领取腾讯云代金券