Python3中concurrent.futures模块介绍

写在前面

第一次接触futures模块是在tornado中,tornado4.0版本使用futures频率很高,因为tornado本身的corouting的异步功能,是需要逻辑里面所调用的模块本身就支持异步才可以实现,而futures模块恰恰支持异步。在futures模块中,我们关心的是Executor和Future这两个类。

Executor是具体异步执行器的抽象基类,具有两个子类ThreadPoolExecutor和ProcessPoolExecutor ;一般使用Executor.submit返回一个Future对象,即异步计算的结果。future是一个未来可期的对象,通过它可以获悉线程(进程)的状态,在主线程(或进程)中可以获取某一个线程(进程)执行的状态或某一个任务执行的状态及返回值。

方法介绍

1、submit方法

submit方法存在于concurrent.futures.Executor类中,查看一下源码:

    def submit(self, fn, *args, **kwargs):
        """Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as fn(*args, **kwargs) and returns
        a Future instance representing the execution of the callable.

        Returns:
            A Future representing the given call.
        """
        raise NotImplementedError()

其中,fn是需要异步执行的函数,submit返回的是一个future对象,注意 submit是非阻塞的,添加完立即返回。模拟一下爬取某个页面的场景,注意使用done方法判断任务是否已完成:

from concurrent.futures import ThreadPoolExecutor
import time

def spider(page):
    time.sleep(page)
    print("抓取任务{page}完成!".format(page=page))
    return page

with ThreadPoolExecutor(max_workers=5) as t:  # 创建一个最大容纳数量为5的线程池
    task_1 = t.submit(spider, 1)
    task_2 = t.submit(spider, 2)  # 通过submit提交待执行函数到线程池中
    task_3 = t.submit(spider, 3)

    print("任务1:", task_1.done())
    print("任务2:", task_2.done())  # 通过done方法判断任务是否完成
    print("任务3:", task_3.done())

    time.sleep(1.5)
    print("任务1:", task_1.done())
    print("任务2:", task_2.done())
    print("任务3:", task_3.done())
    print(task_1.result())
    print(task_2.result())

#运行结果:
任务1:False
任务2:False
任务3:False
抓取任务1完成!
任务1:True
任务2:False
任务3:False
1
抓取任务2完成!
2
抓取任务3完成!

在上述代码中使用 with 语句 (因为含有__enter____exit__这两个魔法函数),通过 ThreadPoolExecutor 构造实例,同时传入 max_workers 参数来设置线程池中最多能同时运行的线程数目,此处为5个;接着使用submit 方法将待执行的任务提交到线程池中,并返回该任务的句柄(类似于文件中常使用的f),再次强调submit方法是非阻塞的,提交后立即返回。最后使用 done方法来判断该任务是否结束。

运行结果可以看出,提交任务后立即判断任务状态时,显示3个任务都未完成。过了1.5秒后再次判断,发现task_1已执行完毕,task_2和task_3 还在执行中,最后主程序结束,task_2和task_3也执行完毕输出任务完成的字眼。使用Future类中的result方法获取任务的返回值,即图中的_result的值:

线程池虽然好用,但是也要注意产生死锁,看下面这个官方给出的例子:

import time
from concurrent.futures import ThreadPoolExecutor

def wait_on_b():
    time.sleep(3)
    print(b.result())   # b不会完成,因为它一直在等待a返回的结果
    return 3

def wait_on_a():
    time.sleep(3)
    print(a.result())  # a不会完成,因为它一直在等待b返回的结果
    return 3

executor = ThreadPoolExecutor(max_workers=3)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

这个程序是典型的因为互相调用而造成了死锁。

2、wait方法

wait方法存在于concurrent.futures模块中,它的源码如下所示:

def wait(fs, timeout=None, return_when=ALL_COMPLETED):
    """Wait for the futures in the given sequence to complete.

    Args:
        fs: The sequence of Futures (possibly created by different Executors) to
            wait upon.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        return_when: Indicates when this function should return. The options
            are:

            FIRST_COMPLETED - Return when any future finishes or is
                              cancelled.
            FIRST_EXCEPTION - Return when any future finishes by raising an
                              exception. If no future raises an exception
                              then it is equivalent to ALL_COMPLETED.
            ALL_COMPLETED -   Return when all futures finish or are cancelled.

    Returns:
        A named 2-tuple of sets. The first set, named 'done', contains the
        futures that completed (is finished or cancelled) before the wait
        completed. The second set, named 'not_done', contains uncompleted
        futures.
    """

从源码可知wait方法接受三个参数,fs表示需要执行的序列;timeout表示等待的最大时间,如果超过这个时间即使线程未执行完也将返回;return_whe表示wait返回结果的条件,默认为 ALL_COMPLETED即全部执行完成再返回。wait方法最后返回结果是两个集合,一个是已完成的future对象;另一个是未完成的future对象。继续使用之前那个例子来介绍wait方法的具体使用:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
import time

def spider(page):
    time.sleep(page)
    print("抓取任务{page}完成!".format(page=page))
    return page

with ThreadPoolExecutor(max_workers=5) as t:  # 创建一个最大容纳数量为5的线程池
    task_list = [t.submit(spider, page) for page in range(1, 4)]  # 通过submit提交待执行函数到线程池中
    wait(fs=task_list, timeout=0, return_when=FIRST_COMPLETED)
    print("任务已完成")
    print(wait(fs=task_list, timeout=2.5))

# 运行结果:
任务已完成
抓取任务1完成!
抓取任务2完成!
DoneAndNotDoneFutures(done={<Future at 0x4c90a10 state=finished returned int>, <Future at 0x4dea9f0 state=finished returned int>}, not_done={<Future at 0x4deaaf0 state=running>})
抓取任务3完成!

在上述代码中设置的返回条件是:当完成第一个任务的时候,就停止等待,继续执行主线程任务;由于设置了超时,可以看到只有任务3还未完成。

3、as_completed方法

as_completed方法存在于concurrent.futures模块中,它的源码如下所示:

def as_completed(fs, timeout=None):
    """An iterator over the given futures that yields each as it completes.

    Args:
        fs: The sequence of Futures (possibly created by different Executors) to
            iterate over.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.

    Returns:
        An iterator that yields the given Futures as they complete (finished or
        cancelled). If any given Futures are duplicated, they will be returned
        once.

    Raises:
        TimeoutError: If the entire result iterator could not be generated
            before the given timeout.
    """

尽管future模块提供了done方法用于判断任务是否执行完成,但是不能在主线程中一针对某个任务都写一个判断语句。最好是当某个任务结束了,它自动给主线程返回一个结果,而不是一直判断每个任务是否结束,此时as_completed方法就派上用场了。as_completed有两个参数,fs是future对象构成的序列,timeout是等待的最小秒数。而它最后返回的是一个迭代器,如果有重复也只是返回第一次出现的那个。依旧使用上面的例子来介绍as_completed方法:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def spider(page):
    time.sleep(page)
    print("抓取任务{page}完成!".format(page=page))
    return page

with ThreadPoolExecutor(max_workers=5) as t:  # 创建一个最大容纳数量为5的线程池
    obj_list = []
    for page in range(1, 4):
        obj = t.submit(spider, page)
        obj_list.append(obj)

    for future in as_completed(obj_list):
        data = future.result()
        print("返回结果是:", data)

# 运行结果:
抓取任务1完成!
返回结果是:1
抓取任务2完成!
返回结果是:2
抓取任务3完成!
返回结果是:3

前面也说了as_completed返回一个生成器,在没有任务完成的时候,会一直阻塞,除非设置了 timeout。当有某个任务完成的时候,会 yield 这个任务,就能执行 for 循环下面的语句,然后继续阻塞住,循环到所有的任务结束。同时先完成的任务会先返回给主线程用于输出。

4、map方法

map方法存在于concurrent.futures.Executor类中,查看一下源码:

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """Returns an iterator equivalent to map(fn, iter).

        Args:
            fn: A callable that will take as many arguments as there are
                passed iterables.
            timeout: The maximum number of seconds to wait. If None, then there
                is no limit on the wait time.
            chunksize: The size of the chunks the iterable will be broken into
                before being passed to a child process. This argument is only
                used by ProcessPoolExecutor; it is ignored by
                ThreadPoolExecutor.

        Returns:
            An iterator equivalent to: map(func, *iterables) but the calls may
            be evaluated out-of-order.

        Raises:
            TimeoutError: If the entire result iterator could not be generated
                before the given timeout.
            Exception: If fn(*args) raises for any values.

map方法有4个参数,其中fn是需要线程执行的函数;iterables是可迭代对象;timeout是超时时间,这个和wait方法中的 timeout 一样,但由于 map 是返回线程执行的结果。因此若timeout值小于线程执行时间则会抛TimeoutError异常;chunksize是iterable在传递给子进程之前,被拆分块的大小,默认为1即不拆分。注意chunksize参数只在ProcessPoolExecutor中使用,ThreadPoolExecutor中经常忽略不使用。老规矩依旧使用上面的例子来介绍map方法:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def spider(page):
    time.sleep(page)
    print("抓取任务{page}完成!".format(page=page))
    return page

executor = ThreadPoolExecutor(max_workers=3)
i = 1
for result in executor.map(spider, [3, 1, 2]):
    print("第{i}个页面结果是:{result}".format(i=i, result=result))
    i += 1

# 运行结果:
抓取任务1完成!
抓取任务2完成!
抓取任务3完成!
第1个页面结果是:3
第2个页面结果是:1
第3个页面结果是:2

不知道你发现没有,使用 map方法时,不需要再次使用submit 方法。因为submit方法的作用就是将序列中的每个元素都执行同一个函数。而此处的map方法与 python 高阶map函数的含义相同,也都是将序列中的每个元素都执行同一个函数。所以就不需要再次使用submit 方法了。

从运行结果可以看出,与上面采用as_completed方法输出的结果不同,最后的输出顺序与列表的顺序一致,而且就算只花费1s的任务先执行完成,也会先打印前面提交的任务返回的结果,即map方法返回的顺序是你提交序列的顺序(有序),如此看来这个map方法其实也是非阻塞的。

注意在上述代码中最好是使用with,而是for。如果你一定要使用for,那么一定要手动调用executor.shutdown方法用于释放资源。若使用了with方法的话,其内部已经实现了wait()方法,故在使用完毕后可自行关闭线程池,减少资源浪费。

其他方法

其实future模块中还提供了其他方法,这里简单列举一下:

Future.cancel()  # 用于终止某个线程和进程的任务,返回状态为 True或者False
Future.cancelled()  # 判断任务是否真的结束了
Future.running()  # 判断任务是否还在运行
Future.done()  # 前面说过判断任务是否是正常执行完的
Future.result(timeout=None)  # 对result的结果进行超时设置

实战训练

接下来通过抓取我个人博客网站来仔细了解一下使用单线程和使用线程池之间的运行效率对比试验。

使用单线程的代码:

import time
import requests

headers = {
    "Host": "blog.licheetools.top",
    "Origin": "https://blog.licheetools.top",
    "User-Agent": "Chrome/70.0.3538.110",
}

def spider(url):
    response = requests.get(url, headers=headers)
    if response:
        return response.status_code
    else:
        return None

def main():
    start_time = time.time()
    for page in range(1, 18):
        url = "https://blog.licheetools.top/page/{page}/".format(page=page)
        data = spider(url)
        print(data)
        print('*' * 20)
    times = time.time() - start_time
    print(times)

if __name__ == "__main__":
    main()

运行时间如下:

15.47508978843689

再来看一下使用线程池时的代码:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import requests

headers = {
    "Host": "blog.licheetools.top",
    "Origin": "https://blog.licheetools.top",
    "User-Agent": "Chrome/70.0.3538.110",
}

def spider(url):
    response = requests.get(url, headers=headers)
    if response:
        return response.status_code
    else:
        return None

def main():
    with ThreadPoolExecutor(max_workers=8) as t:
        obj_list = []
        start_time = time.time()
        for page in range(1, 18):
            url = "https://blog.licheetools.top/page/{page}/".format(page=page)
            obj = t.submit(spider, url)
            obj_list.append(obj)

        for future in as_completed(obj_list):
            data = future.result()
            print(data)
            print('*' * 20)
        times = time.time() - start_time
        print(times)

if __name__ == "__main__":
    main()

运行结果:

2.055521011352539

其实你也能感受到运行的快慢,毕竟这两者之间的差别实在是太大了,这还是17个页面的比较,当涉及到成百上千时,那差别就更大了。如此关于Python中concurrent.futures模块的介绍就到这里。

本文分享自微信公众号 - 啃饼思录(kbthinking),作者:i思录

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-03-30

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • concurrent.futures进行

    Python中进行并发编程一般使用threading和multiprocessing模块,不过大部分的并发编程任务都是派生一系列线程,从队列中收集资源,然后用队...

    py3study
  • 只需几行代码,即可实现多线程和多进程操作

    原题 | PYTHON: A QUICK INTRODUCTION TO THE CONCURRENT.FUTURES MODULE

    kbsc13
  • 只需几行代码,即可实现多线程和多进程操作

    concurrent.futures 是标准库里的一个模块,它提供了一个实现异步任务的高级 API 接口。本文将通过一些代码例子来介绍这个模块常见的用法。

    王图思睿
  • Python中编写并发程序

    在Python中,由于历史原因(GIL),使得Python中多线程的效果非常不理想.GIL使得任何时刻Python只能利用一个CPU核,并且它的调度算法简单粗暴...

    py3study
  • Angular中ngCookies模块介绍

    1.Cookie介绍 Cookie总是保存在客户端中,按在客户端中的存储位置,可分为内存Cookie和硬盘Cookie。内存Cookie由浏览器维护,保存在内...

    八哥
  • Python3~Json模块dumps、loads、dump、load函数介绍

    json.dumps()用于将dict类型的数据转成str,因为如果直接将dict类型的数据写入json文件中会发生报错,因此在将数据写入时需要用到该函数。

    双面人
  • 用python3的多进程和协程处理MyS

    本文介绍用python3的多进程 + 协程处理MySQL的数据,主要逻辑是拉取MySQL的数据,然后使用flashtext匹配关键字,在存回MySQL,代码如下...

    py3study
  • Python并发编程应该使用哪个标准库?

    并发编程是刚需,尤其是在多 I/O 操作时,多线程,协程,多进程三路英雄各显神通。多线程,协程属于并发操作,多进程属于并行操作,那么你是否清楚了什么是并发,什么...

    somenzz
  • 3行代码让Python数据处理脚本获得4倍提速

    Python是一门非常适合处理数据和自动化完成重复性工作的编程语言,我们在用数据训练机器学习模型之前,通常都需要对数据进行预处理,而Python就非常适合完成这...

    一墨编程学习
  • Ansible模块介绍

    描述:ansible使用ansible-doc --list可以看见所有的模块,ansble-doc -s模块名称显示模块使用详情;

    WeiyiGeek
  • Qt模块介绍

    Qt5中,模块已经非常细化了,关于模块的详细信息可以在帮助文档中看到。Qt的帮助文档叫Qt助手。为了方便使用,我们一般都是把它固定在任务栏。

    zy010101
  • Python_模块介绍

                                      a、yum命令

    py3study
  • Python简述

    小编给读者们分享一下Python线程池模块ThreadPoolExecutor用法,文中结合实例形式分析了Python线程池模块ThreadPoolExecut...

    py3study
  • 3行代码实现 Python 并行处理,速度提高6倍!

    原标题:Here’s how you can get a 2–6x speed-up on your data pre-processing with Pyth...

    数说君
  • fist模板模块介绍

    fist(https://github.com/fanux/fist)是sealyun开源的一款k8s管理软件,不同于其它管理软件的是其奉承的原则是...

    sealyun
  • Python3 中 random模块

    #用于生成一个指定范围内的随机符点数,两个参数其中一个是上限,一个是下限。如果a > b,则生成的随机数n: a <= n <= b。如果 a <b, 则 b ...

    py3study
  • python3中argparse模块

    如:[root@openstack_1 /]# ls root/ #其中root/是位置参数

    狼啸风云
  • Python之模块介绍

    os.makedirs('dirname1/dirname2')  可生成多层递归目录

    py3study
  • PCL中IO模块和类的介绍

    (1)class pcl::FIleReader:定义了PCD文件的读取接口,主要用作其他读取类的父类 pcl::FileReader有pcl::PCDRe...

    点云PCL博主

扫码关注云+社区

领取腾讯云代金券