首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Python3并发编程-多线程threading

同一进程的各个线程间可以共享主线程的地址空间和各种资源。

1:线程的创建

1.1:Thread类创建线程

# -*- coding: utf-8 -*- from threading import Thread import osimport time def func(index,dic): print(f'线程{index};进程id={os.getpid()}') dic['cnt'] += 1 count = 0 while True: time.sleep(1) if count > 2: break count += 1 dic['cnt'] += 1 print(f'线程{index};cnt=[{dic["cnt"]}]') print(f'线程{index};cnt=[{dic["cnt"]}],退出') def main(): dic = {} dic['cnt'] = 0 t_list = [] for i in range(5): t = Thread(target=func, args=(i, dic,)) # 线程中数据是共享的,看dic中数据的变化 # t.setDaemon(True) # 默认为False;设置为True时,主线程退出时,子线程也会退出 t.start() # print(t.getName()) # 获取线程名称 t_list.append(t) print(f'主线程,进程id={os.getpid()}') for t in t_list: t.join() # 等待线程的结束,非必须。 print(f'主线程;cnt=[{dic["cnt"]}],退出') if __name__ == '__main__': main()

1.2:继承Thread类的子类创建

# -*- coding: utf-8 -*- from threading import Thread import osimport time class MyThread(Thread): def __init__(self, index,dic): super().__init__() self.index = index self.dic = dic def run(self): # 调用start()函数之后会自动调用此函数 print(f'线程{self.index};进程id={os.getpid()}') self.dic['cnt'] += 1 count = 0 while True: time.sleep(1) if count > 2: break count += 1 self.dic['cnt'] += 1 print(f'线程{self.index};cnt=[{self.dic["cnt"]}]') print(f'线程{self.index};cnt=[{self.dic["cnt"]}],退出') def main(): dic = {} dic['cnt'] = 0 t_list = [] for i in range(5): t = MyThread(i, dic) # t.setDaemon(True) # 默认为False;设置为True时,主线程退出时,子线程也会退出 t.start() # print(t.getName()) # 获取线程名称 t_list.append(t) print(f'主线程,进程id={os.getpid()}') for t in t_list: t.join() # 等待线程的结束,非必须。 print(f'主线程;cnt=[{dic["cnt"]}],退出') if __name__ == '__main__': main()

2:线程的同步

同一进程下的线程资源是共享的,但是对共享数据的操作是不安全的,因此需要进行同步操作。

2.1:锁:Lock

# -*- coding: utf-8 -*- from threading import Threadfrom multiprocessing import Lockimport osimport time def func(index,dic,lock): print(f'线程{index};进程id={os.getpid()}') while True: try: lock.acquire() cnt = dic['cnt'] time.sleep(0.0001) # 加点延时,不然看不到效果 if cnt > 0: dic['cnt'] -= 1 print(f'线程{index};获取到:{cnt}号票;剩下={dic["cnt"]};') else: lock.release() break lock.release() except: break print(f'线程{index};cnt=[{dic["cnt"]}],退出') def main(): lock = Lock() dic = {} dic['cnt'] = 20 t_list = [] for i in range(5): t = Thread(target=func, args=(i, dic,lock,)) # 线程中数据是共享的,看dic中数据的变化 # t.setDaemon(True) # 默认为False;设置为True时,主线程退出时,子线程也会退出 t.start() # print(t.getName()) # 获取线程名称 t_list.append(t) print(f'主线程,进程id={os.getpid()}') for t in t_list: t.join() # 等待线程的结束,非必须。 print(f'主线程;cnt=[{dic["cnt"]}],退出') if __name__ == '__main__': main()

把锁注释掉,看运行结果:

# -*- coding: utf-8 -*- from threading import Threadfrom multiprocessing import Lockimport osimport time def func(index,dic,lock): print(f'线程{index};进程id={os.getpid()}') while True: try: # lock.acquire() cnt = dic['cnt'] time.sleep(0.0001) # 加点延时,不然看不到效果 if cnt > 0: dic['cnt'] -= 1 print(f'线程{index};获取到:{cnt}号票;剩下={dic["cnt"]};') else: # lock.release() break # lock.release() except: break print(f'线程{index};cnt=[{dic["cnt"]}],退出') def main(): lock = Lock() dic = {} dic['cnt'] = 20 t_list = [] for i in range(5): t = Thread(target=func, args=(i, dic,lock,)) # 线程中数据是共享的,看dic中数据的变化 # t.setDaemon(True) # 默认为False;设置为True时,主线程退出时,子线程也会退出 t.start() # print(t.getName()) # 获取线程名称 t_list.append(t) print(f'主线程,进程id={os.getpid()}') for t in t_list: t.join() # 等待线程的结束,非必须。 print(f'主线程;cnt=[{dic["cnt"]}],退出') if __name__ == '__main__': main()

2.2:死锁

死锁就是有多个锁,多个线程中互相等待对方的锁。

比如:

有两个锁:锁A,锁B;

有两个线程:线程A,线程B

在线程A中已获取锁A,在线程B中已获取锁B,然后在线程A中要获取锁B,而此时锁B已在线程B中被占有,线程A中只能等待;在线程B中,又要获取锁A,而锁A已在线程A中被占有,这时线程B只能等待;就这样形成了,线程A,线程B分别在等待对方释放自己所要的锁。

# -*- coding: utf-8 -*- from threading import Threadfrom multiprocessing import Lockimport osimport time from threading import Thread,Lockimport time def funcA(lockA,lockB): print(" funcA:需要A锁") lockA.acquire() time.sleep(0.2) print(" funcA:获得了:A锁") print(" funcA:需要B锁") lockB.acquire() # B锁 在funcB中被占有,而在funcB中又在等待A锁的释放 print(" funcA:获得了:B锁") lockB.release() lockA.release() def funcB(lockA,lockB): print(" funcB:需要B锁") lockB.acquire() time.sleep(0.2) print(" funcB:获得了:B锁") print(" funcB:需要A锁") # A锁 在funcA lockA.acquire() # A锁 在funcA中被占有,而在funcA中又在等待B锁的释放 print(" funcB:获得了:A锁") lockA.release() lockB.release() def main(): lockA = Lock() lockB = Lock() t_a = Thread(target=funcA, args=(lockA, lockB,)) t_a.start() t_b = Thread(target=funcB, args=(lockA, lockB,)) t_b.start() print("结束主线程") if __name__ == "__main__": main()

2.3:递归锁:RLock

# -*- coding: utf-8 -*- from threading import Threadfrom multiprocessing import Lockimport osimport time from threading import Thread,RLock # 递归锁import time def funcA(lockA,lockB): print(" funcA:需要A锁") lockA.acquire() time.sleep(0.2) print(" funcA:获得了:A锁") print(" funcA:需要B锁") lockB.acquire() print(" funcA:获得了:B锁") lockB.release() lockA.release() def funcB(lockA,lockB): print(" funcB:需要B锁") lockB.acquire() time.sleep(0.2) print(" funcB:获得了:B锁") print(" funcB:需要A锁") lockA.acquire() print(" funcB:获得了:A锁") lockA.release() lockB.release() def main(): lockA = lockB = RLock() t_a = Thread(target=funcA, args=(lockA, lockB,)) t_a.start() t_b = Thread(target=funcB, args=(lockA, lockB,)) t_b.start() print("结束主线程") if __name__ == "__main__": main()

2.4:信号量:BoundedSemaphore

互斥锁同时只允许一个线程更改数据,而信号量是同时允许多少个线程同时运行。

# -*- coding: utf-8 -*- from threading import Thread,BoundedSemaphore,active_countimport time def func(index, semaphore): semaphore.acquire() #加锁 print(f"线程:{index} 运行中...,当前活动线程数:{active_count()}") time.sleep(5) # 为了看到效果,休眠久点 semaphore.release() #释放 def main(): semaphore = BoundedSemaphore(3) # 最多允许3个线程同时运行 for i in range(20): t = Thread(target=func, args=(i, semaphore)) t.start() print(f"主线程:当前活动线程数:{active_count()}") if __name__ == '__main__': main()

2.5:事件:Event

# -*- coding: utf-8 -*-import timefrom threading import Thread,Event def light(e): while 1: print('现在是红灯:') time.sleep(5) e.set() # 设置event的状态值为True ; print('现在是绿灯:') time.sleep(3) e.clear() # 恢复event的状态值为False。 def car(index,e): if e.is_set(): # 返回event的状态值; # 状态为True print(f' 现在是绿灯,{index}:过马路中!') else: print(f' 现在是红灯{index}:等待中!') e.wait() # 如果 event.isSet()==False将阻塞 print(f' 红灯变绿灯,{index}:可以走了!') def main(): e = Event() lgh = Thread(target=light, args=(e,)) lgh.start() cnt = 0 while 1: time.sleep(1) # 每隔1秒来一辆car t1 = Thread(target=car, args=(cnt, e,)) t1.start() cnt += 1 if __name__ == '__main__': main()

2.6:线程池

2.6.1:submit方法

# -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport timefrom threading import current_thread def func(index): # print(index,current_thread().ident) time.sleep(0.1) return [index,index**2] if __name__ == "__main__": t_p = ThreadPoolExecutor(max_workers=6) t_ret_list = [] for i in range(20): t = t_p.submit(func, i) t_ret_list.append(t) for ret in t_ret_list: print(ret.result())

2.6.2:map方法

# -*- coding: utf-8 -*- from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutorimport timefrom threading import current_thread def func(index): # print(index,current_thread().ident) time.sleep(0.1) return [index,index**2] if __name__ == "__main__": t_p = ThreadPoolExecutor(max_workers=6) map_ret = t_p.map(func,range(20)) print(map_ret) for ret in map_ret: print(ret)

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20200808A07B0D00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券