前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用concurrent.futures模块并发,实现进程池、线程池

使用concurrent.futures模块并发,实现进程池、线程池

作者头像
用户1214487
发布2018-01-24 10:20:22
7610
发布2018-01-24 10:20:22
举报
文章被收录于专栏:PythonPython

一、关于concurrent.futures模块

Python标准库为我们提供了threading和multiprocessing模块编写相应的异步多线程/多进程代码。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutorProcessPoolExecutor两个类ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。实现了对threadingmultiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。  concurrent.futures基础模块是executor和future。

  concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。

  Future这个概念相信有java和nodejs下编程经验的朋友肯定不陌生了,你可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future对象代表的就是给定的调用。 

二、submit()方法实现进程池/线程池

进程池

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor
import os,time,random
def task(n):
    print('%s is running' %os.getpid())
    time.sleep(2)
    return n**2


if __name__ == '__main__':
    p=ProcessPoolExecutor()  #不填则默认为cpu的个数
    l=[]
    start=time.time()
    for i in range(10):
        obj=p.submit(task,i)   #submit()方法返回的是一个future实例,要得到结果需要用obj.result()
        l.append(obj)

    p.shutdown()  #类似用from multiprocessing import Pool实现进程池中的close及join一起的作用
    print('='*30)
    # print([obj for obj in l])
    print([obj.result() for obj in l])
    print(time.time()-start)

    #上面方法也可写成下面的方法
    # start = time.time()
    # with ProcessPoolExecutor() as p:   #类似打开文件,可省去.shutdown()
    #     future_tasks = [p.submit(task, i) for i in range(10)]
    # print('=' * 30)
    # print([obj.result() for obj in future_tasks])
    # print(time.time() - start)

线程池

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import threading
import os,time,random
def task(n):
    print('%s:%s is running' %(threading.currentThread().getName(),os.getpid()))
    time.sleep(2)
    return n**2

if __name__ == '__main__':
    p=ThreadPoolExecutor()   #不填则默认为cpu的个数*5
    l=[]
    start=time.time()
    for i in range(10):
        obj=p.submit(task,i)
        l.append(obj)
    p.shutdown()
    print('='*30)
    print([obj.result() for obj in l])
    print(time.time()-start)

#上面方法也可写成下面的方法
    # start = time.time()
    # with ThreadPoolExecutor() as p:   #类似打开文件,可省去.shutdown()
    #     future_tasks = [p.submit(task, i) for i in range(10)]
    # print('=' * 30)
    # print([obj.result() for obj in future_tasks])
    # print(time.time() - start)

默认为异步执行

代码语言:javascript
复制
#p.submit(task,i).result()即同步执行
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def task(n):
    print('%s is running' %os.getpid())
    time.sleep(2)
    return n**2


if __name__ == '__main__':
    p=ProcessPoolExecutor()
    start=time.time()
    for i in range(10):
        res=p.submit(task,i).result()
        print(res)
    print('='*30)
    print(time.time()-start)

三、回调函数

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
from threading import currentThread
def get_page(url):
    print('%s:<%s> is getting [%s]' %(currentThread().getName(),os.getpid(),url))
    response=requests.get(url)
    time.sleep(2)
    return {'url':url,'text':response.text}
def parse_page(res):  #此处的res是一个p.submit获得的一个future对象,不是结果
    res=res.result()  #res.result()拿到的才是对应的结果
    print('%s:<%s> parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
    with open('db.txt','a') as f:
        parse_res='url:%s size:%s\n' %(res['url'],len(res['text']))
        f.write(parse_res)
if __name__ == '__main__':
    # p=ProcessPoolExecutor()
    p=ThreadPoolExecutor()
    urls = [
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
        'https://www.baidu.com',
    ]

    for url in urls:
        # multiprocessing.pool_obj.apply_async(get_page,args=(url,),callback=parse_page)
        p.submit(get_page, url).add_done_callback(parse_page) #与之前的回调函数拿到的结果不同,这里拿到的是前面submit方法执行完后返回的对象,要.result才能拿到对应的结果
    p.shutdown()
    print('主',os.getpid())

四、map方法

和内置函数map差不多的用法,这个方法返回一个map(func, *iterables)迭代器,迭代器中的回调执行返回的结果有序的。

以下是通过concurrent.futures模块下类ThreadPoolExecutor和ProcessPoolExecutor实例化的对象的map方法实现进程池、线程池

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
def task(n):
    print('%s is running' %os.getpid())
    time.sleep(2)
    return n**2


if __name__ == '__main__':
    # p=ProcessPoolExecutor()
    p=ThreadPoolExecutor()
    start = time.time()
    obj=p.map(task,range(10))
    p.shutdown()
    print('='*30)
    print(list(obj))
    print(time.time() - start)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017-08-31 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、关于concurrent.futures模块
  • 二、submit()方法实现进程池/线程池
    • 进程池
      • 线程池
      • 三、回调函数
      • 四、map方法
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档