首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Python3并发编程-多进程multiprocessing

进程之间是相互独立的,各个进程之间的数据是不能共享的。Python3中的multiprocess包提供了进程相关操作的类,如Process类等。

1:子进程创建

1.1:Process类创建子进程

# -*- coding: utf-8 -*- import osimport timefrom multiprocessing import Process # print(help(Process)) def func(pid): print(f'父进程:', pid) print(f'子进程:',os.getpid()) print(f'子进程的父进程:',os.getppid()) time.sleep(10) if __name__ == '__main__': print(f'主进程的父进程ID号:{os.getppid()}') print(f'主进程的进程ID号:{os.getpid()}') #创建一个进程,target:新创建的这个进程会去执行func这个函数 p1 = Process(target=func,args=(os.getpid(),)) #启动进程 p1.start() print('主进程结束')

1.2:Process子类创建子进程

# -*- coding: utf-8 -*- import osimport timefrom multiprocessing import Process # print(help(Process)) # 需要实现类的run方法,在实例调用start()方法之后会自动调用run方法。class MProcess(Process): def __init__(self,ppid): # 需要什么参数就传递什么 super().__init__() # 需要调用父类中的 __init__()方法 self.ppid = ppid def run(self): print(f'父进程:', self.ppid) print(f'子进程:', os.getpid()) print(f'子进程的父进程:', os.getppid()) time.sleep(10) if __name__ == '__main__': print(f'主进程的父进程ID号:{os.getppid()}') print(f'主进程的进程ID号:{os.getpid()}') #创建一个进程,target:新创建的这个进程要去执行func这个函数 p1 = MProcess(os.getpid()) #启动进程 p1.start()print('主进程结束')

2:Process类中的方法

2.1:join()方法:等待子进程结束

查看帮助:# print(help(p1))# print(help(p1.join))join(timeout=None) method of multiprocessing.context.Process instance   Wait until child process terminates

# -*- coding: utf-8 -*- import osimport timefrom multiprocessing import Process # print(help(Process)) def func(pid): print(f'父进程:', pid) print(f'子进程:',os.getpid()) print(f'子进程的父进程:',os.getppid()) time.sleep(10) print('子进程结束') if __name__ == '__main__': print(f'主进程的父进程ID号:{os.getppid()}') print(f'主进程的进程ID号:{os.getpid()}') #创建一个进程,target:新创建的这个进程会去执行func这个函数 p1 = Process(target=func,args=(os.getpid(),)) # print(help(p1)) print(help(p1.join)) #启动进程 p1.start() # p1.join(4) # 设置超时,超过时间子进程还没有结束,不会再等待下去 p1.join() # 一直阻塞在这里,直到子进程结束 print('-'*30) print('主进程结束')

2.2:terminate()方法:终止进程工作

# -*- coding: utf-8 -*- import osimport timefrom multiprocessing import Process # print(help(Process)) def func(pid): print(f'父进程:', pid) print(f'子进程:',os.getpid()) print(f'子进程的父进程:',os.getppid()) while True: time.sleep(1) print(f'子进程:', os.getpid()) print(f'子进程的父进程:', os.getppid()) print('-'*30) print('子进程结束') if __name__ == '__main__': print(f'主进程的父进程ID号:{os.getppid()}') print(f'主进程的进程ID号:{os.getpid()}') #创建一个进程,target:新创建的这个进程会去执行func这个函数 p1 = Process(target=func,args=(os.getpid(),)) #启动进程 p1.start() print('-'*30) time.sleep(10) p1.terminate() # 结束子进程 time.sleep(3) print('子进程状态:',p1.is_alive()) print('主进程中-子进程id:', p1.pid) print('主进程结束')

2.3:守护进程:daemon属性

在主进程结束的时候,子进程也会终止;terminate()方法是主进程还在的时候,终止子进程。

# -*- coding: utf-8 -*- import osimport timefrom multiprocessing import Process # print(help(Process)) def func(pid): print(f'父进程:', pid) print(f'子进程:',os.getpid()) print(f'子进程的父进程:',os.getppid()) while True: time.sleep(1) print(f'子进程:', os.getpid()) print(f'子进程的父进程:', os.getppid()) print('-'*30) print('子进程结束') if __name__ == '__main__': print(f'主进程的父进程ID号:{os.getppid()}') print(f'主进程的进程ID号:{os.getpid()}') #创建一个进程,target:新创建的这个进程会去执行func这个函数 p1 = Process(target=func,args=(os.getpid(),)) #启动进程 p1.start() print('-'*30) time.sleep(10) p1.terminate() # 结束子进程 time.sleep(3) print('子进程状态:',p1.is_alive()) print('主进程中-子进程id:', p1.pid) print('主进程结束')

3:进程间通信与同步

进程之间是相互独立的,每个进程都有自己的地址空间,代码段、堆栈段和数据段。其主要通过以下方法来进行进程之间的通信。

3.1:管道Pipe

# -*- coding: utf-8 -*- import osimport timefrom multiprocessing import Process,Pipe def func1(conn): while True: # time.sleep(10000) # 这里卡住,来测试conn.send(i) 是不是阻塞 msg = conn.recv() # 没有数据了的时候,会阻塞在这里, print(f">>{msg}") # conn.send(1000) # 收发在不同的进程中进行,不然会卡住。 if msg == "exit": # break # time.sleep(2) # 产生数据,向管道中写def func2(conn): i = 0 while True: # print(f'in func2 {i}') # 打印日志,来测试是否阻塞 conn.send(i) # 如果只是向管道中写,达到一定数量(我机子上是1408)就会阻塞 i += 1 if i > 10000: break def main(): send, recv = Pipe() p_list = [] for i in range(5): # 创建5个进程来读数据 p1 = Process(target=func1, args=(recv,)) p1.start() p_list.append(p1) p2 = Process(target=func2, args=(send,)) # 产生数据,或者叫做“生产者” p2.start() p2.join() print('p2.join() 之后') # 执行到这里,说明产生数据的进程,已结束。 # 给各个读进程发送结束命令 for i in range(5): send.send('exit') for p in p_list: p.join() print("主进程结束") if __name__ == '__main__': main()

3.2:队列Queue

# -*- coding: utf-8 -*- import osimport timefrom multiprocessing import Process,Queue def func1(q): while True: msg = q.get() # 当 q.empty() 为True(即队列为空)的时候,get会阻塞 print(f">>{msg}") if msg == "exit": # break # time.sleep(2) # 产生数据def func2(q): i = 0 while True: # stru_t = time.localtime() # strTime = time.strftime('%Y-%m-%d %H:%M:%S', stru_t) q.put(i) # 当 q.full() 为True(即队列已满时会阻塞) i += 1 if i > 10000: break def main(): # 队列大小为5,当q中的数据达到5条时,再执行q.put()操作会阻塞 # 队列为空的时,执行q.get()操作会阻塞 q = Queue(5) p_list = [] for i in range(5): # 创建5个进程来读数据 p1 = Process(target=func1, args=(q,)) p1.start() p_list.append(p1) p2 = Process(target=func2, args=(q,)) # 产生数据,或者叫做“生产者” p2.start() p2.join() # 执行到这里,说明产生数据的进程已结束。 # 给各个读进程发送结束命令 for i in range(5): q.put('exit') for p in p_list: p.join() print("主进程结束") if __name__ == '__main__': main()

3.3:队列JoinableQueue

# -*- coding: utf-8 -*- import osimport timeimport tracebackfrom multiprocessing import Process,JoinableQueue #def func1(q): while True: try: msg = q.get() # msg = q.get(False) 当q为空时会抛出异常 print(f">>{msg}") q.task_done() # 处理完成数据,需要给q对象发送了一个任务结束的信号 except Exception: val = traceback.format_exc() print(val) break # time.sleep(2) # 产生数据:def func2(q): i = 0 while True: q.put(i) # i += 1 if i > 10000: break q.join() #也可以放在这里 def main(): q = JoinableQueue(5) p_list = [] for i in range(5): # 创建5个进程来读数据 p1 = Process(target=func1, args=(q,)) p1.daemon = True # daemon 设置为 True时,父进程结束时,子进程结束也会结束。, p1.start() p_list.append(p1) p2 = Process(target=func2, args=(q,)) # 产生数据,或者叫做“生产者” p2.start() p2.join() # 这里需要等待产生数据的进程结束 # # q.join() # 等待队列中的数据处理完,不需要像之前那样发送"exit"来结束进程;也可以放到func2中去。 print("主进程结束") if __name__ == '__main__': main()

3.4:共享数据:Managers

各个进程之间是相互独立的,进程之间的数据也是独立的,不能共享。python3中提供了Managers模块来实现各个进程之间数据的共享。

# -*- coding: utf-8 -*- import osimport timeimport tracebackfrom multiprocessing import Process,Manager #def func1(dic,lst,index,dic2): # 观察 dic,lst,dic2 的内存地址,在子进程中与在父进程中都不同; print(f'子进程{index},id(dic):{id(dic)},id(dic2):{id(dic2)},id(lst):{id(lst)}') dic[index] = os.getpid() # dic是由Manager创建的,显然每个进程中的地址都不同,但其中的值是共享的,在每个进程中都能访问,修改; dic2[index] = os.getpid() # 只有在自己进程中可以访问 while True: try: s = lst.pop() print(f'进程{index},取出:{s}: 剩下lst={lst}') if len(lst) break time.sleep(1) except: break print(f'子进程{index}结束:dic:{dic},dic2:{dic2},lst:{lst}') def main(): with Manager() as manager: dic = manager.dict() # 通过manager创建的数据才能在进程间共享,操作其创建的变量不需要加锁,Manager已默认加锁。 lst = manager.list(range(20)) dic2 = {} # 这种方式创建的变量,只能是在本进程中使用, print(f'主进程:id(dic):{id(dic)},id(dic2):{id(dic2)},id(lst):{id(lst)}') p_list = [] for index in range(5): # 创建5个进程来读数据 p1 = Process(target=func1, args=(dic,lst,index,dic2,)) p1.start() p_list.append(p1) for p in p_list: p.join() # 等待子进程结束 # 打印结果 print("打印结果:") print(dic) print(dic2) print(lst) print(f'主进程结束:id(dic):{id(dic)},id(dic2):{id(dic2)},id(lst):{id(lst)}') print("主进程结束") if __name__ == '__main__': main()

3.5:信号量:Semaphore

# -*- coding: utf-8 -*- import timeimport randomfrom multiprocessing import Process,Semaphore,Manager def func1(index,s,lst,dic): # print(lst) while True: try: s.acquire() # 每次acquire就减1,直到减到0,为0时就等待 val = lst.pop() s.release() dic['cnt'] += 1 print(f'{dic["cnt"]},进程{index},取出:{val}: 剩下lst={lst}') if len(lst) break # time.sleep(1) except: s.release() # 释放 break def main(): s = Semaphore(5) # 创建一个计数器,同时只能5个进程在运行 with Manager() as manager: # print(help(manager)) # print(help(Manager)) lst = manager.list(range(20)) dic = manager.dict() dic['cnt'] = 0 # 使用共享变量来计数,manager创建的变量不需要加锁,Manager已默认加锁。 print(dic['cnt'] ) p_list = [] for i in range(10): p1 = Process(target=func1, args=(i, s, lst,dic,)) p1.start() p_list.append(p1) for p in p_list: p.join() # 等待子进程结束 print(dic['cnt']) if __name__ == '__main__': main()

3.6:事件:Event

# -*- coding: utf-8 -*-import timefrom multiprocessing import Process,Event def light(e): while 1: print('现在是红灯:') time.sleep(5) e.set() # 设置event的状态值为True ; print('现在是绿灯:') time.sleep(3) e.clear() # 恢复event的状态值为False。 def car(index,e): if e.is_set(): # 返回event的状态值; # 状态为True print(f' 现在是绿灯,{index}:过马路中!') else: print(f' 现在是红灯{index}:等待中!') e.wait() # 如果 event.isSet()==False将阻塞 print(f' 红灯变绿灯,{index}:可以走了!') if __name__ == '__main__': e = Event() lgh = Process(target=light,args=(e,)) lgh.start() cnt = 0 while 1: time.sleep(1) # 每隔1秒来一辆car p1 = Process(target=car,args=(cnt,e,)) p1.start() cnt += 1

3.7:锁:Lock

锁是为了实现数据的唯一性,即同时只能由一个进程对其操作;

# -*- coding: utf-8 -*- import jsonimport timeimport random from multiprocessing import Process,Lock def get_ticket(index,ticket_lock): ticket_lock.acquire() # 获取锁,如果被其他进程获得,此处将会阻塞, with open('ticket','r' ) as f: last_ticket_info = json.load(f) last_ticket = last_ticket_info['count'] if last_ticket > 0: print(f'{index}号进程,抢到票,抢到{last_ticket}号票') time.sleep(random.random()) last_ticket -= 1 last_ticket_info['count'] = last_ticket with open('ticket','w') as f: json.dump(last_ticket_info, f) else: print(f'{index}号进程,没有票了,明天再来') ticket_lock.release() # 释放锁 def main(): ticket_lock = Lock() for i in range(10): p = Process(target=get_ticket, args=(i, ticket_lock)) p.start() if __name__ == '__main__': main() """ticket文件中的内容为:{"count": 5}"""

3.8:进程池:Pool

进程池就是先创建一些进程放在那,当有处理任务来的时候,就去进程池中获取一个进程来处理这个任务,在Python3中提供了Pool模块来处理进程池的相关操作,不需要我们自己来实现进程池;

进程池中有两个方法:

apply:同步,一般不使用

apply_async:异步

# -*- coding: utf-8 -*- from multiprocessing import Process,Poolimport os, time, random def fun1(index): print(f"开始处理[{index}]-进程{os.getpid()}.") start = time.time() time.sleep(random.random() * 3) end = time.time() print(f"结束处理[{index}]-进程{os.getpid()},处理时间:{end - start}.") def gen_data(pool): cnt = 0; while True: pool.apply_async(func=fun1, args=(cnt,)) cnt += 1 # time.sleep(1) if cnt > 100: break def main(): pool = Pool(6) # 创建一个6个进程的进程池 gen_data(pool) pool.close() # close表示不能添加处理任务 pool.join() # 等待进程池处理完所有任务 if __name__=='__main__': main() print('主进程结束')

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20200808A07B5800?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券