首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python3 与 C# 并发编程之~ 线程篇2

Python3 与 C# 并发编程之~ 线程篇2

原创
作者头像
逸鹏
发布2018-09-07 08:38:08
6140
发布2018-09-07 08:38:08
举报
文章被收录于专栏:逸鹏说道逸鹏说道

2.2.加强篇

其实以前的 Linux中是没有线程这个概念的, Windows程序员经常使用线程,这一看~方便啊,然后可能是当时程序员偷懒了,就把进程模块改了改(这就是为什么之前说Linux下的多进程编程其实没有Win下那么“重量级”),弄了个精简版进程==> 线程(内核是分不出 进程和线程的,反正 PCB个数都是一样)

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享(全局变量和堆 ==> 线程间共享。进程的栈 ==> 线程平分而独占

还记得通过 current_thread()获取的线程信息吗?难道线程也没个id啥的?一起看看:(通过 ps-Lfpid来查看LWP

1.线程ID.png
1.线程ID.png

回顾:进程共享的内容:(回顾:http://www.cnblogs.com/dotnetcrazy/p/9363810.html

  1. 代码(.text)
  2. 文件描述符(fd)
  3. 内存映射(mmap)

2.2.1.线程同步~互斥锁Lock

线程之间共享数据的确方便,但是也容易出现数据混乱的现象,来看个例子:

from multiprocessing.dummy import threadingnum = 0  # def global numdef test(i):    print(f"子进程:{i}")    global num    for i in range(100000):        num += 1def main():    p_list = [threading.Thread(target=test, args=(i, )) for i in range(5)]    for i in p_list:        i.start()    for i in p_list:        i.join()    print(num)  # 应该是500000,发生了数据混乱,结果少了很多if __name__ == '__main__':    main()

输出:(应该是 500000,发生了数据混乱,只剩下 358615

子进程:0子进程:1子进程:2子进程:3子进程:4452238
Lock案例

共享资源+CPU调度==>数据混乱==解决==>线程同步 这时候 Lock就该上场了

互斥锁是实现线程同步最简单的一种方式,读写都加锁(读写都会串行)

先看看上面例子怎么解决调:

from multiprocessing.dummy import threading, Locknum = 0  # def global numdef test(i, lock):    print(f"子进程:{i}")    global num    for i in range(100000):        with lock:            num += 1def main():    lock = Lock()    p_list = [threading.Thread(target=test, args=(i, lock)) for i in range(5)]    for i in p_list:        i.start()    for i in p_list:        i.join()    print(num)if __name__ == '__main__':    main()

输出: time python31.thread.2.py

子进程:0子进程:1子进程:2子进程:3子进程:4500000real    0m2.846suser    0m1.897ssys    0m3.159s
优化下

lock设置为全局或者局部,性能几乎一样。循环换成map后性能有所提升(测试案例在Code中

from multiprocessing.dummy import Pool as ThreadPool, Locknum = 0  # def global numlock = Lock()def test(i):    print(f"子进程:{i}")    global num    global lock    for i in range(100000):        with lock:            num += 1def main():    p = ThreadPool()    p.map_async(test, list(range(5)))    p.close()    p.join()    print(num)if __name__ == '__main__':    main()

输出:

time python31.thread.2.py

子进程:0子进程:1子进程:3子进程:2子进程:4500000real    0m2.468suser    0m1.667ssys    0m2.644s

本来多线程访问共享资源的时候可以并行,加锁后就部分串行了(没获取到的线程就阻塞等了)

项目中可以多次加锁,每次加锁只对修改部分加(尽量少的代码) 】(以后会说协程和Actor模型

补充:以前都是这么写的,现在支持 with托管了(有时候还会用到,所以了解下):【net是直接 lock大括号包起来

#### 以前写法:lock.acquire() # 获取锁try:    num += 1finally:    lock.release() # 释放锁#### 等价简写with lock:    num += 1

扩展知识:(GIL在扩展篇会详说)

  1. GIL的作用:多线程情况下必须存在资源的竞争,GIL是为了保证在解释器级别的线程唯一使用共享资源(cpu)。
  2. 同步锁的作用:为了保证解释器级别下的自己编写的程序唯一使用共享资源产生了同步锁

2.2.2.线程同步~递归锁RLock

看个场景:小明欠小张2000,欠小周5000,现在需要同时转账给他们:(规定:几次转账加几次锁

2.RLock.png
2.RLock.png

小明啥也没管,直接撸起袖子就写Code了:(错误Code示意

from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 8000xiaozhang = 3000xiaozhou = 5000def test(lock):    global xiaoming    global xiaozhang    global xiaozhou    # 小明想一次搞定:    with lock:        # 小明转账2000给小张        xiaoming -= 2000        xiaozhang += 2000        with lock:            # 小明转账5000给小周            xiaoming -= 5000            xiaozhou += 5000def main():    print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")    lock = Lock()    p = ThreadPool()    p.apply_async(test, args=(lock, ))    p.close()    p.join()    print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")if __name__ == '__main__':    main()

小明写完代码就出去了,这可把小周和小张等急了,打了N个电话来催,小明心想啥情况?

一看代码楞住了,改了改代码,轻轻松松把钱转出去了:

from multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 8000xiaozhang = 3000xiaozhou = 5000# 小明转账2000给小张def a_to_b(lock):    global xiaoming    global xiaozhang    with lock:        xiaoming -= 2000        xiaozhang += 2000# 小明转账5000给小周def a_to_c(lock):    global xiaoming    global xiaozhou    with lock:        xiaoming -= 5000        xiaozhou += 5000def main():    print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")    lock = Lock()    p = ThreadPool()    p.apply_async(a_to_b, args=(lock, ))    p.apply_async(a_to_c, args=(lock, ))    p.close()    p.join()    print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")if __name__ == '__main__':    main()

输出:

[还钱前]小明8000,小张3000,小周5000[还钱后]小明1000,小张5000,小周10000

就这么算了吗?不不不,不符合小明性格,于是小明研究了下,发现~还有个递归锁 RLock呢,正好解决他的问题:

from multiprocessing.dummy import Pool as ThreadPool, RLock  # 就把这边换了下xiaoming = 8000xiaozhang = 3000xiaozhou = 5000def test(lock):    global xiaoming    global xiaozhang    global xiaozhou    # 小明想一次搞定:    with lock:        # 小明转账2000给小张        xiaoming -= 2000        xiaozhang += 2000        with lock:            # 小明转账5000给小周            xiaoming -= 5000            xiaozhou += 5000def main():    print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")    lock = RLock()  # 就把这边换了下    p = ThreadPool()    p.apply_async(test, args=(lock, ))    p.close()    p.join()    print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")if __name__ == '__main__':    main()

RLock内部维护着一个 Lock和一个counter变量, counter记录了acquire的次数,从而使得资源可以被多次 require。直到一个线程所有的 acquire都被release,其他的线程才能获得资源


2.2.3.死锁引入

1.多次获取导致死锁

小明想到了之前说的(互斥锁 Lock读写都加锁)就把代码拆分研究了下:

print("[开始]小明转账2000给小张")lock.acquire()  # 获取锁xiaoming -= 2000xiaozhang += 2000print("[开始]小明转账5000给小周")lock.acquire()  # 获取锁(互斥锁第二次加锁)xiaoming -= 5000xiaozhou += 5000lock.release()  # 释放锁print("[结束]小明转账5000给小周")lock.release()  # 释放锁print("[开始]小明转账2000给小张")

输出发现:(第二次加锁的时候,变成阻塞等了【死锁】)

[还钱前]小明8000,小张3000,小周5000[开始]小明转账2000给小张[开始]小明转账5000给小周

这种方式,Python提供的RLock就可以解决了

2.常见的死锁

看个场景:小明和小张需要流水帐,经常互刷~ 小明给小张转账1000,小张给小明转账1000

一般来说,有几个共享资源就加几把锁(小张、小明就是两个共享资源,所以需要两把 Lock

先描述下然后再看代码:

正常流程 小明给小张转1000:小明自己先加个锁==>小明-1000==>获取小张的锁==>小张+1000==>转账完毕

死锁情况 小明给小张转1000:小明自己先加个锁==>小明-1000==>准备获取小张的锁。可是这时候小张准备转账给小明,已经把自己的锁获取了,在等小明的锁(两个人相互等,于是就一直死锁了)

代码模拟一下过程:

from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 5000xiaozhang = 8000m_lock = Lock() # 小明的锁z_lock = Lock() # 小张的锁# 小明转账1000给小张def a_to_b():    global xiaoming    global xiaozhang    global m_lock    global z_lock    with m_lock:        xiaoming -= 1000        sleep(0.01)        with z_lock:            xiaozhang += 1000# 小张转账1000给小明def b_to_a():    global xiaoming    global xiaozhang    global m_lock    global z_lock    with z_lock:        xiaozhang -= 1000        sleep(0.01)        with m_lock:            xiaoming += 1000def main():    print(f"[还钱前]小明{xiaoming},小张{xiaozhang}")    p = ThreadPool()    p.apply_async(a_to_b)    p.apply_async(b_to_a)    p.close()    p.join()    print(f"[还钱后]小明{xiaoming},小张{xiaozhang}")if __name__ == '__main__':    main()

输出:(卡在这边了)

[转账前]小明5000,小张8000

项目中像这类的情况,一般都是这几种解决方法:(还有其他解决方案,后面会继续说)

  1. 按指定顺序去访问共享资源
  2. 在访问其他锁的时候,先把自己锁解了
  3. trylock的重试机制
  4. 得不到全部锁就先放弃已经获取的资源

比如上面的情况,我们如果规定,不管是谁先转账,先从小明开始,然后再小张,那么就没问题了。或者谁钱多就谁(权重高的优先)

from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Lockxiaoming = 5000xiaozhang = 8000m_lock = Lock()  # 小明的锁z_lock = Lock()  # 小张的锁# 小明转账1000给小张def a_to_b():    global xiaoming    global xiaozhang    global m_lock    global z_lock    # 以上次代码为例,这边只修改了这块    with z_lock:  # 小张权重高,大家都先获取小张的锁        xiaozhang += 1000        sleep(0.01)        with m_lock:            xiaoming -= 1000# 小张转账1000给小明def b_to_a():    global xiaoming    global xiaozhang    global m_lock    global z_lock    with z_lock:        xiaozhang -= 1000        sleep(0.01)        with m_lock:            xiaoming += 1000def main():    print(f"[转账前]小明{xiaoming},小张{xiaozhang}")    p = ThreadPool()    p.apply_async(a_to_b)    p.apply_async(b_to_a)    p.close()    p.join()    print(f"[转账后]小明{xiaoming},小张{xiaozhang}")if __name__ == '__main__':    main()

输出:

[转账前]小明5000,小张8000[转账后]小明5000,小张8000

2.2.4.线程同步~条件变量Condition

条件变量一般都不是锁,只能能阻塞线程,从而减少不必要的竞争,Python内置了 RLock(不指定就是RLock)

看看源码:

class Condition:    """    实现条件变量的类。    条件变量允许一个或多个线程等到另一个线程通知它们为止    如果给出了lock参数而不是None,那必须是Lock或RLock对象作底层锁。    否则,一个新的RLock对象被创建并用作底层锁。    """    def __init__(self, lock=None):        if lock is None:            lock = RLock()        self._lock = lock        # 设置lock的acquire()和release()方法        self.acquire = lock.acquire        self.release = lock.release

再看看可不可以进行with托管:(支持)

def __enter__(self):    return self._lock.__enter__()def __exit__(self, *args):    return self._lock.__exit__(*args)

看个生产消费者的简单例子:(生产完就通知消费者)

from multiprocessing.dummy import Pool as ThreadPool, Conditions_list = []con = Condition()def Shop(i):    global con    global s_list    # 加锁保护共享资源    for x in range(5):        with con:            s_list.append(x)            print(f"[生产者{i}]生产商品{x}")            con.notify_all()  # 通知消费者有货了def User(i):    global con    global s_list    while True:        with con:            if s_list:                print(f"列表商品:{s_list}")                name = s_list.pop()  # 消费商品                print(f"[消费者{i}]消费商品{name}")                print(f"列表剩余:{s_list}")            else:                con.wait()def main():    p = ThreadPool()    # 两个生产者    p.map_async(Shop, range(2))    # 五个消费者    p.map_async(User, range(5))    p.close()    p.join()if __name__ == '__main__':    main()

输出:(list之类的虽然可以不加global标示,但是为了后期维护方便,建议加上)

[生产者0]生产商品0[生产者0]生产商品1列表商品:[0, 1][消费者0]消费商品1列表剩余:[0]列表商品:[0][消费者0]消费商品0列表剩余:[][生产者0]生产商品2列表商品:[2][消费者1]消费商品2列表剩余:[][生产者0]生产商品3[生产者1]生产商品0[生产者0]生产商品4列表商品:[3, 0, 4][消费者1]消费商品4列表剩余:[3, 0][生产者1]生产商品1[生产者1]生产商品2[生产者1]生产商品3[生产者1]生产商品4列表商品:[3, 0, 1, 2, 3, 4][消费者2]消费商品4列表剩余:[3, 0, 1, 2, 3]列表商品:[3, 0, 1, 2, 3][消费者0]消费商品3列表剩余:[3, 0, 1, 2]列表商品:[3, 0, 1, 2][消费者1]消费商品2列表剩余:[3, 0, 1]列表商品:[3, 0, 1][消费者3]消费商品1列表剩余:[3, 0]列表商品:[3, 0][消费者3]消费商品0列表剩余:[3]列表商品:[3][消费者3]消费商品3列表剩余:[]

通知方法:

  1. notify() :发出资源可用的信号,唤醒任意一条因 wait()阻塞的进程
  2. notifyAll() :发出资源可用信号,唤醒所有因wait()阻塞的进程

2.2.5.线程同步~信号量Semaphore(互斥锁的高级版)

记得当时在分析 multiprocessing.Queue源码的时候,有提到过(点我回顾

同进程的一样, semaphore管理一个内置的计数器,每当调用 acquire()时内置函数 -1,每当调用 release()时内置函数 +1

通俗讲就是:在互斥锁的基础上封装了下,实现一定程度的并行

举个例子,以前使用互斥锁的时候:(厕所就一个坑位,必须等里面的人出来才能让另一个人上厕所)

3.互斥锁.png
3.互斥锁.png

使用信号量之后:厕所坑位增加到5个(自己指定),这样可以5个人一起上厕所了==>实现了一定程度的并发

举个例子:(Python在语法这点特别爽,不用你记太多异同,功能差不多基本上代码也就差不多)

from time import sleepfrom multiprocessing.dummy import Pool as ThreadPool, Semaphoresem = Semaphore(5) # 限制最大连接数为5def goto_wc(i):    global sem    with sem:        print(f"[线程{i}]上厕所")        sleep(0.1)def main():    p = ThreadPool()    p.map_async(goto_wc, range(50))    p.close()    p.join()if __name__ == '__main__':    main()

输出:

4.semaphore.png
4.semaphore.png

可能看了上节回顾的会疑惑:源码里面明明是 BoundedSemaphore,搞啥呢?

其实 BoundedSemaphore就比 Semaphore多了个在调用 release()时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常

以上一个案例说事:你换成 BoundedSemaphore和上面效果一样==> sem=BoundedSemaphore(5)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2.2.加强篇
    • 2.2.1.线程同步~互斥锁Lock
      • Lock案例
      • 优化下
    • 2.2.2.线程同步~递归锁RLock
      • 2.2.3.死锁引入
        • 1.多次获取导致死锁
        • 2.常见的死锁
      • 2.2.4.线程同步~条件变量Condition
        • 2.2.5.线程同步~信号量Semaphore(互斥锁的高级版)
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档