前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文学会用python进行并行计算

一文学会用python进行并行计算

作者头像
MeteoAI
发布2019-07-24 16:11:52
1.5K0
发布2019-07-24 16:11:52
举报
文章被收录于专栏:MeteoAI

Python实现多线程/多进程,大家常常会用到标准库中的threadingmultiprocessing模块。

但从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutorProcessPoolExecutor两个类,实现了对threadingmultiprocessing的进一步抽象,使得开发者只需编写少量代码即可让程序实现并行计算。

ThreadPoolExecutor和ProcessPoolExecutor

concurrent.futures模块的基础是Exectuor抽象类(包括map, submit , shutdown方法),但是它不能被直接使用。

一般会对它的两个子类ThreadPoolExecutorProcessPoolExecutor进行调用,两者分别被用来创建线程池和进程池。

当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间。

我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。

1. ThreadPoolExecutor创建线程池

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor
import time

t0 = time.time()


def return_future_result(message1, message2):
    s = 0
    for i in range(50000000):
        s *= i
    return message1, message2

pool = ThreadPoolExecutor(max_workers=3)  # 创建一个最大可容纳3个task的线程池

future1 = pool.submit(return_future_result, message1="hello",
                      message2='111')  # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, message2="world",
                      message1='222')  # 往线程池里面加入一个task

print(future1.done())  # 判断task1是否结束
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果
print(future1.done())  # 判断task1是否结束
print(future2.done())  # 判断task2是否结束
t1 = time.time()
print(f"total time consumed: {(t1-t0):{2}.{5}}")

输出:


代码语言:javascript
复制
False
False
('hello', '111')
('222', 'world')
True
True
total time consumed: 9.325

2. ProcessPoolExecutor创建进程池

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor
import time

t0 = time.time()

def return_future_result(message1, message2):
    s = 0
    for i in range(50000000):
        s *= i
    return message1, message2


pool = ProcessPoolExecutor(max_workers=3)  # 创建一个最大可容纳3个task的进程池

future1 = pool.submit(return_future_result, message1="hello",
                      message2='111')  # 往进程池里面加入一个task
future2 = pool.submit(return_future_result, message2="world",
                      message1='222')  # 往进程池里面加入一个task

print(future1.done())  # 判断task1是否结束
print(future2.done())  # 判断task2是否结束
print(future1.result())  # 查看task1返回的结果
print(future2.result())  # 查看task2返回的结果
print(future1.done())  # 判断task1是否结束
print(future2.done())  # 判断task2是否结束

t1 = time.time()
print(f"total time consumed: {(t1-t0):{2}.{5}}")

输出:


代码语言:javascript
复制
False
False
('hello', '111')
('222', 'world')
True
True
total time consumed: 3.7551

可以看到用进程池的时间小于线程池。由于GIL(global interpreter lock, 全局解释锁)的存在,使用多线程并不会真正意义上实现并发,使用多进程可以通过子进程的形式同时运行多个解释器,而它们的GIL是独立的,这样就可以是python程序充分利用多核CPU进行并行计算。

3. Future类[1]

一般由Executor.submit()创建,将可调用对象封装为异步执行。future是一种便利的模式用来追踪异步调用的结果。 常用的方法有done(), result(), exception(), add_done_callback()等。

map和submit方法

ThreadPoolExecutorProcessPoolExecutor常用的方法有mapsubmit

1. map

map(func, *iterables, timeout=None, chunksize=1)

map方法类似于python标准库中的map方法[2]。不同的是:

•这里的可迭代对象(iterables)是被立即collect的,而不是惰性的执行返回;•func是异步执行的,可以实现并发。

需要注意的是,当func有多个参数时,如果对多个可迭代对象进行map操作时,最短的可迭代对象耗尽时则整个迭代也会结束,似于python内置的map方法。如下:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor
import time

def return_future_result(message1, message2):
    time.sleep(1)
    return message1, message2

lst1 = ['ddddd', "cccccc", 'eeeeeeeee']
lst2 = ['1111111', '33333', '222222', '555555']

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    a = executor.map(return_future_result, lst1, lst2)

print(a)
print(list(a))

# 输出:
<generator object Executor.map.<locals>.result_iterator at 0x1075f0468>
[('ddddd', '1111111'), ('cccccc', '33333'), ('eeeeeeeee', '222222')]  # no '555555'

函数有多参数的时候,我们可以通过functools.partial来“固定”一些参数:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor
import time
import functools

def return_future_result(message1, message2):
    time.sleep(1)
    return message1, message2

lst1 = ['ddddd', "cccccc", 'eeeeeeeee']

return_future_result_new = functools.partial(
    return_future_result, message2='fixed_message2')

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    a = executor.map(return_future_result_new, lst1)

print(a)
print(list(a))
# 输出:
<generator object Executor.map.<locals>.result_iterator at 0x108e76f10>
[('ddddd', 'fixed_message2'), ('cccccc', 'fixed_message2'), ('eeeeeeeee', 'fixed_message2')]

2. submit

submit(fn, *args, **kwargs),会调用fn(*args **kwargs) 并会返回一个Future对象,来保存程序执行的状态和结果。 在使用submit的过程中需要注意,一些函数内部的错误会被忽略,一些潜在的bug会不容易发现,例如有一些I/O操作出错的话,很容易被我们忽略。比如:

代码语言:javascript
复制
import time
import pandas as pd
from concurrent.futures import ProcessPoolExecutor

def return_future_result(num):
    df = pd.read_csv("no_such_file_%s.csv"%(num))
    df.to_csv("no_such_file_%s.csv"%(num),index=None)

with ProcessPoolExecutor(max_workers=2) as pool:
    future1 = pool.submit(return_future_result, 666)

print(future1.done()) # True

print(future1.exception()) # File b'no_such_file_666.csv' does not exist

print(future1.result()) # 会报错: FileNotFoundError: File b'no_such_file_666.csv' does not exist

其中的错误,我们可以直观的从运行结果中得出。同时,从运行结果可看出,as_completed不是按照URLS列表元素的顺序返回的,会返回先执行完的结果。

我们可以在程序执行完后,用try去catch结果中的错误,使用方法如下:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request as ur

URLS = ['https://www.python.org/', 'http://www.taobao.com',
        'http://www.baidu.com', 'http://www.cctv.com/', '### I AM A BUG ###']

def load_url(url, timeout):
    with ur.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with ThreadPoolExecutor(max_workers=3) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

# 输出:
'http://www.baidu.com' page is 153884 bytes
'http://www.cctv.com/' page is 298099 bytes
'### I AM A BUG ###' generated an exception: unknown url type: '### I AM A BUG ##'
'http://www.taobao.com' page is 143891 bytes
'https://www.python.org/' page is 49114 bytes

如果执行ThreadPoolExecutormap方法,结果是按照URLS列表元素的顺序返回的,而且用map方法写出的代码更加简洁直观。但是一旦程序有异常,会保存在结果的生成器中,会增加debug的困难,所以更推荐上面的方法。

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor
import urllib.request as ur

URLS = ['https://www.python.org/', 'http://www.taobao.com',
        'http://www.baidu.com', 'http://www.cctv.com/'] 

def load_url(url):
    with ur.urlopen(url, timeout=60) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with ThreadPoolExecutor(max_workers=3) as executor:
    for url, data in zip(URLS, executor.map(load_url, URLS)):
        print('%r page is %d bytes' % (url, len(data))) # 如果URLS 中有 '### I AM A BUG ###' 则会报错


# 输出:
'https://www.python.org/' page is 49255 bytes
'http://www.taobao.com' page is 143891 bytes
'http://www.baidu.com' page is 153380 bytes
'http://www.cctv.com/' page is 298099 bytes

as_completed和wait方法

concurrent.futures模块中常用的方法有wait, as_completed,用于处理Executors返回的Future对象。

1. as_completed

concurrent.futures.as_completed(fs, timeout=None)Future对象生成一个迭代器返回,并且先返回先执行完的结果(map会按照我们传入的可迭代对象中的顺序返回)。

submit + as_completed:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep
from random import randint

def return_after_5_secs(num):
    sleep_time = randint(0, 5)
    sleep(sleep_time)
    return "Return of {}, sleep_time: {}".format(num, sleep_time)

pool = ThreadPoolExecutor(6)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))

for x in as_completed(futures):
    print(x.result())

# 输出:
Return of 4, sleep_time: 0
Return of 0, sleep_time: 1
Return of 3, sleep_time: 1
Return of 2, sleep_time: 2
Return of 1, sleep_time: 3

map:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from random import randint

def return_after_5_secs(num):
    sleep_time = randint(0, 5)
    sleep(sleep_time)
    return "Return of {}, sleep_time: {}".format(num, sleep_time)

pool = ThreadPoolExecutor(6)
re_ge = pool.map(return_after_5_secs, [x for x in range(5)])

for i in re_ge:
    print(i)

# 输出:
Return of 0, sleep_time: 3
Return of 1, sleep_time: 4
Return of 2, sleep_time: 3
Return of 3, sleep_time: 4
Return of 4, sleep_time: 4

2. wait

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)有更大的自由度,它将一个Future列表作为参数,并阻塞程序执行,最终会根据return_when等待Future对象完成到某个状态才返回结果,return_when可选参数有:FIRST_COMPLETED, FIRST_EXCEPTIONALL_COMPLETED

返回结果为一个包含两个集合的named tuple,其中一个集合包含完成的,另一个为未完成的。

如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
import time
from random import randint


def return_after_5_secs(num):
    sleep_time = randint(0, 5)
    time.sleep(sleep_time)
    return "Return of {}, sleep_time: {}".format(num, sleep_time)


pool = ThreadPoolExecutor(6)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))

t0 = time.time()
print(wait(futures))
t1 = time.time()
print(f"wait {t1-t0:{2}.{5}}")

for x in as_completed(futures):
    print(x.result())

输出如下:

return_when='ALL_COMPLETED',not_done无元素,wait的时间较长


代码语言:javascript
复制
DoneAndNotDoneFutures(done={<Future at 0x10a8d8240 state=finished returned str>, <Future at 0x10a856c88 state=finished returned str>, <Future at 0x10a7b8eb8 state=finished returned str>, <Future at 0x10a7bd128 state=finished returned str>, <Future at 0x10a7b81d0 state=finished returned str>}, not_done=set())
wait 4.0018
Return of 2, sleep_time: 2
Return of 3, sleep_time: 1
Return of 1, sleep_time: 4
Return of 0, sleep_time: 4
Return of 4, sleep_time: 0

如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
import time
from random import randint


def return_after_5_secs(num):
    sleep_time = randint(0, 5)
    time.sleep(sleep_time)
    return "Return of {}, sleep_time: {}".format(num, sleep_time)


pool = ThreadPoolExecutor(6)
futures = []
for x in range(5):
    futures.append(pool.submit(return_after_5_secs, x))

t0 = time.time()
print(wait(futures, return_when='FIRST_COMPLETED'))
t1 = time.time()
print(f"wait {t1-t0:{2}.{5}}")

for x in as_completed(futures):
    print(x.result())

输出如下:

return_when='FIRST_COMPLETED',not_done有元素,wait的时间很短


代码语言:javascript
复制
DoneAndNotDoneFutures(done={<Future at 0x10a8d8978 state=finished returned str>}, not_done={<Future at 0x10a856e80 state=running>, <Future at 0x10a8d8128 state=running>, <Future at 0x10a7bd048 state=running>, <Future at 0x10a8d92e8 state=running>})
wait 0.00015497 
Return of 3, sleep_time: 0
Return of 4, sleep_time: 1
Return of 0, sleep_time: 2
Return of 2, sleep_time: 2
Return of 1, sleep_time: 4
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-05-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 MeteoAI 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ThreadPoolExecutor和ProcessPoolExecutor
    • 1. ThreadPoolExecutor创建线程池
      • 2. ProcessPoolExecutor创建进程池
        • 3. Future类[1]
        • map和submit方法
          • 1. map
            • 2. submit
            • as_completed和wait方法
              • 1. as_completed
                • submit + as_completed:
                  • map:
                    • 2. wait
                      • 如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成:
                        • 输出如下:
                          • 如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成:
                            • 输出如下:
                            相关产品与服务
                            GPU 云服务器
                            GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于生成式AI,自动驾驶,深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档