前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python 进程池Pool

python 进程池Pool

作者头像
Devops海洋的渔夫
发布2019-06-02 13:53:00
9950
发布2019-06-02 13:53:00
举报
文章被收录于专栏:Devops专栏Devops专栏

进程池Pool

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务,请看下面的实例:

代码语言:javascript
复制
# -*- coding:utf-8 -*-
from multiprocessing import Process
from multiprocessing import Pool
import time
import os
import random


def smoke(num):
    t_start = time.time() # 记录开始时间
    print("start %d smoke !!!! and pid = %d" % (num,os.getpid()))
    time.sleep(random.random()*2) # random.random随机生成0~1之间的浮点数
    t_stop = time.time()
    print("finish %d smoke, time = %0.2f" % (num,(t_start-t_stop)))

def main():

    # 创建进程池Pool
    po = Pool(3) # 定义一个进程池,最大进程数为3

    # 编写一个循环,加入进程池中
    for i in range(0,10):
        print("------循环 %d --------" % i)
        # Pool().apply_async(调用的目标函数,(传递的参数元组))
        # 每次循环会用空闲出来的子进程去调用目标
        po.apply_async(smoke,(i,))

    print("----start-----")
    po.close() # 关闭进程池
    po.join() # 等待进程池所有子进程执行完毕,必须在close语句之后
    print("----end-----")

if __name__ == "__main__":
   main()

执行如下:

代码语言:javascript
复制
[root@server01 process]# python pool.py 
------循环 0 --------
------循环 1 --------
------循环 2 --------
------循环 3 --------
------循环 4 --------
------循环 5 --------
------循环 6 --------
------循环 7 --------
------循环 8 --------
------循环 9 --------
----start-----
start 0 smoke !!!! and pid = 1656
start 1 smoke !!!! and pid = 1657
start 2 smoke !!!! and pid = 1655
finish 0 smoke, time = -0.10
start 3 smoke !!!! and pid = 1656
finish 3 smoke, time = -0.87
start 4 smoke !!!! and pid = 1656
finish 2 smoke, time = -0.98
start 5 smoke !!!! and pid = 1655
finish 1 smoke, time = -1.67
start 6 smoke !!!! and pid = 1657
finish 4 smoke, time = -1.51
start 7 smoke !!!! and pid = 1656
finish 5 smoke, time = -1.59
start 8 smoke !!!! and pid = 1655
finish 7 smoke, time = -0.17
start 9 smoke !!!! and pid = 1656
finish 6 smoke, time = -1.17
finish 8 smoke, time = -1.60
finish 9 smoke, time = -1.62
----end-----
[root@server01 process]# 

可以从执行的结果看出来,在进行完毕循环的过程中,将方法加入进程池并不会被堵塞,而是被存储起来了,然后再三个进程进行调用。 从下面的调用顺序来看,以此是 1656 --》 1657 --》 1655 三个子进程依次调用方法。

multiprocessing.Pool常用函数解析:

apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表;

  • close():关闭Pool,使其不再接受新的任务;
  • terminate():不管任务是否完成,立即终止;
  • join():主进程阻塞,等待子进程的退出, 必须在closeterminate之后使用;

进程池中的Queue - 传递信息:fat boss,come on tobacco

如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

下面的实例演示了进程池中的进程如何通信:

代码语言:javascript
复制
# -*- coding:utf-8 -*-
from multiprocessing import Process
from multiprocessing import Pool,Manager
import time
import os
import random


def fatboy_libai(q):
    print("fatboy_libai pid = %d" % os.getpid()) # 打印子进程号pid
    for i in range(10):
        print("-- %d -- put msg to queue" % i)
        q.put("fat boss,come on tabacco!!")   # 给队列写入内容

def fatboss(q):
    print("fatboss pid = %d" % os.getpid()) # 打印子进程号pid
    print("queue size=",q.qsize())
    # 读取队列信息
    for i in range(q.qsize()):
        print("-- %d -- get msg from queue" % i)
        print("Queue %d data=%s" % (i,q.get(True)))

def main():
    
    # 创建进程池Pool
    po = Pool() # 定义一个进程池
    
    # 创建一个进程池的队列
    q = Manager().Queue() 
   
    # 进程调用肥仔白的方法,将信息写入队列中
    po.apply_async(fatboy_libai,(q,))
   
    time.sleep(1) # 等待一下,先让上面的任务向Queue写入数据,然后再用进程调用胖子老板的方法,取数据

    # 进程调用胖子老板的方法,读取队列信息
    po.apply_async(fatboss,(q,)) 

    po.close() # 关闭进程池
    po.join() # 等待进程池所有子进程执行完毕,必须在close语句之后
    print("----end-----")

if __name__ == "__main__":
   main()

执行如下:

代码语言:javascript
复制
[root@server01 process]# python pool3.py 
fatboy_libai pid = 2178
-- 0 -- put msg to queue
-- 1 -- put msg to queue
-- 2 -- put msg to queue
-- 3 -- put msg to queue
-- 4 -- put msg to queue
-- 5 -- put msg to queue
-- 6 -- put msg to queue
-- 7 -- put msg to queue
-- 8 -- put msg to queue
-- 9 -- put msg to queue
fatboss pid = 2178
('queue size=', 10)
-- 0 -- get msg from queue
Queue 0 data=fat boss,come on tabacco!!
-- 1 -- get msg from queue
Queue 1 data=fat boss,come on tabacco!!
-- 2 -- get msg from queue
Queue 2 data=fat boss,come on tabacco!!
-- 3 -- get msg from queue
Queue 3 data=fat boss,come on tabacco!!
-- 4 -- get msg from queue
Queue 4 data=fat boss,come on tabacco!!
-- 5 -- get msg from queue
Queue 5 data=fat boss,come on tabacco!!
-- 6 -- get msg from queue
Queue 6 data=fat boss,come on tabacco!!
-- 7 -- get msg from queue
Queue 7 data=fat boss,come on tabacco!!
-- 8 -- get msg from queue
Queue 8 data=fat boss,come on tabacco!!
-- 9 -- get msg from queue
Queue 9 data=fat boss,come on tabacco!!
----end-----
[root@server01 process]# 
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.12.30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 进程池Pool
  • multiprocessing.Pool常用函数解析:
  • 进程池中的Queue - 传递信息:fat boss,come on tobacco
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档