前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python中的多进程处理

python中的多进程处理

作者头像
py3study
发布2020-01-13 12:23:22
6740
发布2020-01-13 12:23:22
举报
文章被收录于专栏:python3python3

  众所周知,python本身是单线程的,python中的线程处理是由python解释器分配时间片的;但在python 3.0中吸收了开源模块,开始支持系统原生的进程处理——multiprocessing. 注意:这个模块的某些函数需要操作系统的支持,例如,multiprocessing.synchronize模块在某些平台上引入时会激发一个ImportError 1)Process   要创建一个Process是很简单的。

  1. from multiprocessing import Process
  2. def f(name):
  3. print('hello', name)
  4. if __name__ == '__main__':
  5.      p = Process(target=f, args=('bob',))
  6.      p.start()
  7.      p.join()

  要获得一个Process的进程ID也是很简单的。

  1. from multiprocessing import Process
  2. import os
  3. def info(title):
  4. print title
  5. print 'module name:', __name__
  6. print 'parent process:', os.getppid()#这个测试不通过,3.0不支持
  7. print 'process id:', os.getpid()
  8. def f(name):
  9.     info('function f')
  10. print 'hello', name
  11. if __name__ == '__main__':
  12.     info('main line')
  13.     p = Process(target=f, args=('bob',))
  14.     p.start()
  15.     p.join()

  创建进程:multiprocessing.Process([group[, target[, name[, args[, kargs]]]]])   参数:   group:    None,它的存在仅仅是为了与threading.Thread兼容   target:   一般是函数   name:     进程名   args:     函数的参数   kargs:    keywords参数   函数:   run()                  默认的run()函数调用target的函数,你也可以在子类中覆盖该函数   start()                启动该进程   join([timeout])        父进程被停止,直到子进程被执行完毕。                          当timeout为None时没有超时,否则有超时。                          进程可以被join很多次,但不能join自己   is_alive()               terminate()            结束进程。                          在Unix上使用的是SIGTERM                          在Windows平台上使用TerminateProcess   属性:   name                   进程名   daemon                 守护进程   pid                    进程ID   exitcode               如果进程还没有结束,该值为None   authkey                    2)Queue   Queue类似于queue.Queue,一般用来进程间交互信息   例子:

  1. from multiprocessing import Process, Queue
  2. def f(q):
  3.     q.put([42, None, 'hello'])
  4. if __name__ == '__main__':
  5.      q = Queue()
  6.      p = Process(target=f, args=(q,))
  7.      p.start()
  8. print(q.get())    # prints "[42, None, 'hello']"
  9.      p.join()

  注意:Queue是进程和线程安全的。   Queue实现了queue.Queue的大部分方法,但task_done()和join()没有实现。      创建Queue:multiprocessing.Queue([maxsize])   函数:   qsize()                             返回Queue的大小   empty()                             返回一个boolean值表示Queue是否为空   full()                              返回一个boolean值表示Queue是否满   put(item[, block[, timeout]])         put_nowait(item)   get([block[, timeout]])   get_nowait()   get_no_wait()   close()                             表示该Queue不在加入新的元素   join_thread()                         cancel_join_thread() 3)JoinableQueue   创建:multiprocessing.JoinableQueue([maxsize])   task_done()   join() 4)Pipe

  1. from multiprocessing import Process, Pipe
  2. def f(conn):
  3.     conn.send([42, None, 'hello'])
  4.     conn.close()
  5. if __name__ == '__main__':
  6.     parent_conn, child_conn = Pipe()
  7.     p = Process(target=f, args=(child_conn,))
  8.     p.start()
  9. print(parent_conn.recv())   # prints "[42, None, 'hello']"
  10.     p.join()

  multiprocessing.Pipe([duplex])      返回一个Connection对象 5)异步化synchronization

  1. from multiprocessing import Process, Lock
  2. def f(l, i):
  3.     l.acquire()
  4. print('hello world', i)
  5.     l.release()
  6. if __name__ == '__main__':
  7.     lock = Lock()
  8. for num in range(10):
  9.         Process(target=f, args=(lock, num)).start()

6)Shared Memory

  1. from multiprocessing import Process, Value, Array
  2. def f(n, a):
  3.     n.value = 3.1415927
  4. for i in range(len(a)):
  5.         a[i] = -a[i]
  6. if __name__ == '__main__':
  7.     num = Value('d', 0.0)
  8.     arr = Array('i', range(10))
  9.     p = Process(target=f, args=(num, arr))
  10.     p.start()
  11.     p.join()
  12. print(num.value)
  13. print(arr[:])

1>Value 2>Array 7)Manager

  1. from multiprocessing import Process, Manager
  2. def f(d, l):
  3.     d[1] = '1'
  4.     d['2'] = 2
  5.     d[0.25] = None
  6.     l.reverse()
  7. if __name__ == '__main__':
  8.     manager = Manager()
  9.     d = manager.dict()
  10.     l = manager.list(range(10))
  11.     p = Process(target=f, args=(d, l))
  12.     p.start()
  13.     p.join()
  14. print(d)
  15. print(l)

8)Pool

  1. from multiprocessing import Pool
  2. def f(x):
  3. return x*x
  4. if __name__ == '__main__':
  5.     pool = Pool(processes=4)              # start 4 worker processes
  6.     result = pool.apply_async(f, [10])     # evaluate "f(10)" asynchronously
  7. print result.get(timeout=1)           # prints "100" unless your computer is *very* slow
  8. print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

multiprocessing.Pool([processes[, initializer[, initargs]]]) 函数:   apply(func[, args[, kwds]])   apply_async(func[, args[, kwds[, callback]]])   map(func,iterable[, chunksize])   map_async(func,iterable[, chunksize[, callback]])   imap(func, iterable[, chunksize])   imap_unordered(func, iterable[, chunksize])   close()   terminate()   join()

  1. from multiprocessing import Pool
  2. def f(x):
  3. return x*x
  4. if __name__ == '__main__':
  5.     pool = Pool(processes=4)              # start 4 worker processes
  6.     result = pool.apply_async(f, (10,))   # evaluate "f(10)" asynchronously
  7. print(result.get(timeout=1))          # prints "100" unless your computer is *very* slow
  8. print(pool.map(f, range(10)))         # prints "[0, 1, 4,..., 81]"
  9.     it = pool.imap(f, range(10))
  10. print(next(it))                       # prints "0"
  11. print(next(it))                       # prints "1"
  12. print(it.next(timeout=1))             # prints "4" unless your computer is *very* slow
  13. import time
  14.     result = pool.apply_async(time.sleep, (10,))
  15. print(result.get(timeout=1))          # raises TimeoutError

9)杂项 multiprocessing.active_children()          返回所有活动子进程的列表 multiprocessing.cpu_count()                返回CPU数目 multiprocessing.current_process()          返回当前进程对应的Process对象 multiprocessing.freeze_support() multiprocessing.set_executable() 10)Connection对象 send(obj) recv() fileno() close() poll([timeout]) send_bytes(buffer[, offset[, size]]) recv_bytes([maxlength]) recv_bytes_info(buffer[, offset]) 

  1. >>> from multiprocessing import Pipe
  2. >>> a, b = Pipe()
  3. >>> a.send([1, 'hello', None])
  4. >>> b.recv()
  5. [1, 'hello', None]
  6. >>> b.send_bytes('thank you')
  7. >>> a.recv_bytes()
  8. 'thank you'
  9. >>> import array
  10. >>> arr1 = array.array('i', range(5))
  11. >>> arr2 = array.array('i', [0] * 10)
  12. >>> a.send_bytes(arr1)
  13. >>> count = b.recv_bytes_into(arr2)
  14. >>> assert count == len(arr1) * arr1.itemsize
  15. >>> arr2
  16. array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-08-05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档