首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅谈 multiprocessing

浅谈 multiprocessing

作者头像
用户1278550
发布2018-08-08 17:50:54
3930
发布2018-08-08 17:50:54
举报
文章被收录于专栏:idbaidba

一前言 使用python进行并发处理多台机器/多个实例的时候,我们可以使用threading ,但是由于著名的GIL存在,实际上threading 并未提供真正有效的并发处理,要充分利用到多核CPU,我们需要使用多进程。Python提供了非常好用的多进程包--multiprocessing。multiprocessing 可以利用multiprocessing.Process对象来创建一个进程,该Process对象与Threading对象的用法基本相同,具有相同的方法(官方原话:"The multiprocessing package mostly replicates the API of the threading module.") 比如:start(),run(),join()的方法。multiprocessing包中也有Lock/Event/Semaphore/Condition/Pipe/Queue类用于进程之间的通信。话不多说 show me the code! 二使用 2.1 初识异同


下面的程序显示threading和multiprocessing的在使用方面的异同,相近的函数join(),start(),append() 等,并做同一件事情打印自己的进程pid

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. import os
  4. import threading
  5. import multiprocessing
  6. def printer(msg):
  7. print(msg, os.getpid())
  8. print('Main begin:', os.getpid())
  9. # threading
  10. record = []
  11. for i in range(5):
  12. thread = threading.Thread(target=printer, args=('threading',))
  13. thread.start()
  14. record.append(thread)
  15. for thread in record:
  16. thread.join()
  17. # multi-process
  18. record = []
  19. for i in range(5):
  20. process = multiprocessing.Process(target=printer, args=('multiprocessing',))
  21. process.start()
  22. record.append(process)
  23. for process in record:
  24. process.join()
  25. print('Main end:', os.getpid())

输出结果

  1. Main begin: 9524
  2. threading 9524
  3. threading 9524
  4. threading 9524
  5. threading 9524
  6. threading 9524
  7. multiprocessing 9539
  8. multiprocessing 9540
  9. multiprocessing 9541
  10. multiprocessing 9542
  11. multiprocessing 9543
  12. Main end: 9524

从例子的结果可以看出多线程threading的进程id和主进程(父进程)pid一样 ,同为9524; 多进程打印的pid每个都不一样,for循环中每创建一个process对象都年开一个进程。其他相关的方法基本类似。 2.2 用法


创建进程的类: Process([group [, target [, name [, args [, kwargs]]]]]), target表示调用对象, args表示调用对象的位置参数元组。 kwargs表示调用对象的字典。 name为进程的别名。 group实质上不使用,为None。 方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程,并自动调用run方法. 属性:authkey、daemon(要通过start()设置,必须设置在方法start之前)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。 2.3 创建单进程


单线程比较简单,创建一个 Process的实例对象就好,传入参数 target 为已经定义好的方法worker以及worker需要的参数

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. import multiprocessing
  4. import datetime, time
  5. def worker(interval):
  6. print("process start: {0}".format(datetime.datetime.today()));
  7. time.sleep(interval)
  8. print("process end: {0}".format(datetime.datetime.today()));
  9. if __name__ == "__main__":
  10. p = multiprocessing.Process(target=worker, args=(5,))
  11. p.start()
  12. p.join()
  13. print "end!"

2.4 创建多进程


  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. """
  4. author: yangyi@youzan.com
  5. time: 2017/7/2 下午7:50
  6. func:
  7. """
  8. import multiprocessing
  9. def worker(num):
  10. print "worker %d" %num
  11. if __name__ == "__main__":
  12. print("The number of CPU is:" + str(multiprocessing.cpu_count()))
  13. proc = []
  14. for i in xrange(5):
  15. p = multiprocessing.Process(target=worker, args=(i,))
  16. proc.append(p)
  17. for p in proc:
  18. p.start()
  19. for p in proc:
  20. p.join()
  21. print "end ..."

输出

  1. The number of CPU is:4
  2. worker 0
  3. worker 1
  4. worker 2
  5. worker 3
  6. worker 4
  7. main process end ...

2.5 线程池


multiprocessing提供进程池的类--Pool,它可以指定程序最大可以调用的进程数量,当有新的请求提交到pool中时,如果进程池还没有满,那么就会创建一个新的进程用来执行该请求;但如果进程池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。 构造方法: Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) processes : 使用的工作进程的数量,如果processes是None,默认使用os.cpu_count()返回的数量。 initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。 maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。 context: 用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。 实例方法:   apply(func[, args[, kwds]]):同步进程池   apply_async(func[, args[, kwds[, callback[, error_callback]]]]) :异步进程池   close() : 关闭进程池,阻止更多的任务提交到pool,待任务完成后,工作进程会退出。   terminate() : 结束工作进程,不在处理未完成的任务.   join() : 等待工作线程的退出,在调用join()前必须调用close()或者 terminate(),因为被终止的进程需要被父进程调用wait(join等价与wait),否则进程会成为僵尸进程。

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. from multiprocessing import Pool
  4. import time
  5. def worker(num):
  6. print "worker %d" %num
  7. time.sleep(2)
  8. print "end worker %d" %num
  9. if __name__ == "__main__":
  10. proc_pool = Pool(2)
  11. for i in xrange(4):
  12. proc_pool.apply_async(worker, (i,)) #使用了异步调用,从输出结果可以看出来
  13. proc_pool.close()
  14. proc_pool.join()
  15. print "main process end ..."

输出结果

  1. worker 0
  2. worker 1
  3. end worker 0
  4. end worker 1
  5. worker 2
  6. worker 3
  7. end worker 2
  8. end worker 3
  9. main process end ..

解释:创建一个进程池pool 对象proc_pool,并设定进程的数量为2,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为2,所以0、1会直接送到进程中执行,当其中的2个任务执行完之后才空出2进程处理对象2和3,所以会出现输出 worker 2 worker 3 出现在end worker 0 end worker 1之后。思考一下如果调用 proc_pool.apply(worker, (i,)) 的输出结果会是什么样的? 2.6 使用queue


multiprocessing提供队列类,可以通过调用multiprocessing.Queue(maxsize) 初始化队列对象,maxsize表示队列里面最多的元素个数。 例子 创建了两个函数入队,出队,出队处理时使用了lock特性,串行化取数据。

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. import time
  4. from multiprocessing import Process, current_process,Lock,Queue
  5. import datetime
  6. def inputQ(queue):
  7. time.sleep(1)
  8. info = "proc_name: " + current_process().name + ' was putted in queue at: ' + str(datetime.datetime.today())
  9. queue.put(info)
  10. def outputQ(queue,lock):
  11. info = queue.get()
  12. lock.acquire()
  13. print ("proc_name: " + current_process().name + ' gets info :' + info)
  14. lock.release()
  15. if __name__ == '__main__':
  16. record1 = [] # store input processes
  17. record2 = [] # store output processes
  18. lock = Lock() # To prevent messy print
  19. queue = Queue(3)
  20. for i in range(10):
  21. process = Process(target=inputQ, args=(queue,))
  22. process.start()
  23. record1.append(process)
  24. for i in range(10):
  25. process = Process(target=outputQ, args=(queue,lock))
  26. process.start()
  27. record2.append(process)
  28. for p in record1:
  29. p.join()
  30. queue.close() # No more object will come, close the queue
  31. for p in record2:
  32. p.join()

2.7 使用pipe


Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。 用法 multiprocessing.Pipe([duplex]) 该类返回一组对象实例(conn1, conn2),分别代表发送和接受消息的两端。

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. from multiprocessing import Process, Pipe
  4. def p1(conn, name):
  5. conn.send('hello ,{name}'.format(name=name))
  6. print "p1 receive :", conn.recv()
  7. conn.close()
  8. def p2(conn, name):
  9. conn.send('hello ,{name}'.format(name=name))
  10. print "p2 receive :", conn.recv()
  11. conn.close()
  12. if __name__ == '__main__':
  13. parent_conn, child_conn = Pipe()
  14. proc1 = Process(target=p1, args=(child_conn, "parent_conn"))
  15. proc2 = Process(target=p2, args=(parent_conn, "child_conn"))
  16. proc1.start()
  17. proc2.start()
  18. proc1.join()
  19. proc2.join()

输出:

  1. p1 receive : hello ,child_conn
  2. p2 receive : hello ,parent_conn

该例子中 p1 p2 通过pipe 给彼此相互发送信息,p1 发送"parent_conn" 给 p2 ,p2 发送"child_conn" 给p1. 2.8 daemon程序对比结果

  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4. print("process start: {0}".format(datetime.datetime.today()));
  5. time.sleep(interval)
  6. print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8. p = multiprocessing.Process(target=worker, args=(5,))
  9. p.start()
  10. print "end!"

输出:

  1. end!
  2. process start: 2017-07-02 18:47:30.656244
  3. process end: 2017-07-02 18:47:35.657464

设置 daemon = True,程序随着主程序结束而不等待子进程。

  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4. print("process start: {0}".format(datetime.datetime.today()));
  5. time.sleep(interval)
  6. print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8. p = multiprocessing.Process(target=worker, args=(5,))
  9. p.daemon = True
  10. p.start()
  11. print "end!"

输出: end!

因为子进程设置了daemon属性,主进程结束,multiprocessing创建的进程对象就随着结束了。

  1. import multiprocessing
  2. import datetime, time
  3. def worker(interval):
  4. print("process start: {0}".format(datetime.datetime.today()));
  5. time.sleep(interval)
  6. print("process end: {0}".format(datetime.datetime.today()));
  7. if __name__ == "__main__":
  8. p = multiprocessing.Process(target=worker, args=(5,))
  9. p.daemon = True #
  10. p.start()
  11. p.join() #进程执行完毕后再关闭
  12. print "end!"

输出:

  1. process start: 2017-07-02 18:48:20.953754
  2. process end: 2017-07-02 18:48:25.954736

2.9 Lock()


当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。 实例方法: acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。 release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。 例子: 多个进程使用同一个std_out ,使用lock机制确保同一个时刻有一个一个进程获取输出。

  1. #!/usr/bin/env python # encoding: utf-8 from multiprocessing import Process, Lock def func_with_lock(l, i): l.acquire() print 'hello world', i l.release() def func_without_lock(i): print 'hello world', i if __name__ == '__main__': lock = Lock() print "func_with_lock :" for num in range(10): Process(target=func_with_lock, args=(lock, num)).start()

输出:

  1. func_with_lock :
  2. hello world 0
  3. hello world 1
  4. hello world 2
  5. hello world 3
  6. hello world 4
  7. hello world 5
  8. hello world 6
  9. hello world 7
  10. hello world 8
  11. hello world 9

三 小结 本文参考官方资料以及其他资源,对multiprocesssing 的使用方式做了总结,还有很多知识需要详细阅读官方文档。纸上来得终觉浅,绝知此事要躬行。

参考资料 [1]官方文档 [2]Python标准库10 多进程初步 (multiprocessing包)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-07-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档