Python实现多线程/多进程,大家常常会用到标准库中的threading
和multiprocessing
模块。
但从Python3.2开始,标准库为我们提供了concurrent.futures
模块,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
两个类,实现了对threading
和multiprocessing
的进一步抽象,使得开发者只需编写少量代码即可让程序实现并行计算。
concurrent.futures
模块的基础是Exectuor
抽象类(包括map
, submit
, shutdown
方法),但是它不能被直接使用。
一般会对它的两个子类ThreadPoolExecutor
和ProcessPoolExecutor
进行调用,两者分别被用来创建线程池和进程池。
当项目达到一定的规模,频繁创建/销毁进程或者线程是非常消耗资源的,这个时候我们就要编写自己的线程池/进程池,以空间换时间。
我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue
来操心死锁的问题,线程池/进程池会自动帮我们调度。
from concurrent.futures import ThreadPoolExecutor
import time
t0 = time.time()
def return_future_result(message1, message2):
s = 0
for i in range(50000000):
s *= i
return message1, message2
pool = ThreadPoolExecutor(max_workers=3) # 创建一个最大可容纳3个task的线程池
future1 = pool.submit(return_future_result, message1="hello",
message2='111') # 往线程池里面加入一个task
future2 = pool.submit(return_future_result, message2="world",
message1='222') # 往线程池里面加入一个task
print(future1.done()) # 判断task1是否结束
print(future2.done()) # 判断task2是否结束
print(future1.result()) # 查看task1返回的结果
print(future2.result()) # 查看task2返回的结果
print(future1.done()) # 判断task1是否结束
print(future2.done()) # 判断task2是否结束
t1 = time.time()
print(f"total time consumed: {(t1-t0):{2}.{5}}")
输出:
False
False
('hello', '111')
('222', 'world')
True
True
total time consumed: 9.325
from concurrent.futures import ProcessPoolExecutor
import time
t0 = time.time()
def return_future_result(message1, message2):
s = 0
for i in range(50000000):
s *= i
return message1, message2
pool = ProcessPoolExecutor(max_workers=3) # 创建一个最大可容纳3个task的进程池
future1 = pool.submit(return_future_result, message1="hello",
message2='111') # 往进程池里面加入一个task
future2 = pool.submit(return_future_result, message2="world",
message1='222') # 往进程池里面加入一个task
print(future1.done()) # 判断task1是否结束
print(future2.done()) # 判断task2是否结束
print(future1.result()) # 查看task1返回的结果
print(future2.result()) # 查看task2返回的结果
print(future1.done()) # 判断task1是否结束
print(future2.done()) # 判断task2是否结束
t1 = time.time()
print(f"total time consumed: {(t1-t0):{2}.{5}}")
输出:
False
False
('hello', '111')
('222', 'world')
True
True
total time consumed: 3.7551
可以看到用进程池的时间小于线程池。由于GIL(global interpreter lock, 全局解释锁)的存在,使用多线程并不会真正意义上实现并发,使用多进程可以通过子进程的形式同时运行多个解释器,而它们的GIL是独立的,这样就可以是python程序充分利用多核CPU进行并行计算。
一般由Executor.submit()
创建,将可调用对象封装为异步执行。future是一种便利的模式用来追踪异步调用的结果。 常用的方法有done()
, result()
, exception()
, add_done_callback()
等。
ThreadPoolExecutor
和ProcessPoolExecutor
常用的方法有map
和submit
。
map(func, *iterables, timeout=None, chunksize=1)
map
方法类似于python标准库中的map方法[2]。不同的是:
•这里的可迭代对象(iterables)是被立即collect的,而不是惰性的执行返回;•func是异步执行的,可以实现并发。
需要注意的是,当func有多个参数时,如果对多个可迭代对象进行map
操作时,最短的可迭代对象耗尽时则整个迭代也会结束,似于python内置的map
方法。如下:
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message1, message2):
time.sleep(1)
return message1, message2
lst1 = ['ddddd', "cccccc", 'eeeeeeeee']
lst2 = ['1111111', '33333', '222222', '555555']
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
a = executor.map(return_future_result, lst1, lst2)
print(a)
print(list(a))
# 输出:
<generator object Executor.map.<locals>.result_iterator at 0x1075f0468>
[('ddddd', '1111111'), ('cccccc', '33333'), ('eeeeeeeee', '222222')] # no '555555'
函数有多参数的时候,我们可以通过functools.partial
来“固定”一些参数:
from concurrent.futures import ThreadPoolExecutor
import time
import functools
def return_future_result(message1, message2):
time.sleep(1)
return message1, message2
lst1 = ['ddddd', "cccccc", 'eeeeeeeee']
return_future_result_new = functools.partial(
return_future_result, message2='fixed_message2')
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
a = executor.map(return_future_result_new, lst1)
print(a)
print(list(a))
# 输出:
<generator object Executor.map.<locals>.result_iterator at 0x108e76f10>
[('ddddd', 'fixed_message2'), ('cccccc', 'fixed_message2'), ('eeeeeeeee', 'fixed_message2')]
submit(fn, *args, **kwargs)
,会调用fn(*args **kwargs)
并会返回一个Future
对象,来保存程序执行的状态和结果。 在使用submit
的过程中需要注意,一些函数内部的错误会被忽略,一些潜在的bug会不容易发现,例如有一些I/O操作出错的话,很容易被我们忽略。比如:
import time
import pandas as pd
from concurrent.futures import ProcessPoolExecutor
def return_future_result(num):
df = pd.read_csv("no_such_file_%s.csv"%(num))
df.to_csv("no_such_file_%s.csv"%(num),index=None)
with ProcessPoolExecutor(max_workers=2) as pool:
future1 = pool.submit(return_future_result, 666)
print(future1.done()) # True
print(future1.exception()) # File b'no_such_file_666.csv' does not exist
print(future1.result()) # 会报错: FileNotFoundError: File b'no_such_file_666.csv' does not exist
其中的错误,我们可以直观的从运行结果中得出。同时,从运行结果可看出,as_completed
不是按照URLS列表元素的顺序返回的,会返回先执行完的结果。
我们可以在程序执行完后,用try去catch结果中的错误,使用方法如下:
from concurrent.futures import ThreadPoolExecutor, as_completed
import urllib.request as ur
URLS = ['https://www.python.org/', 'http://www.taobao.com',
'http://www.baidu.com', 'http://www.cctv.com/', '### I AM A BUG ###']
def load_url(url, timeout):
with ur.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with ThreadPoolExecutor(max_workers=3) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
# 输出:
'http://www.baidu.com' page is 153884 bytes
'http://www.cctv.com/' page is 298099 bytes
'### I AM A BUG ###' generated an exception: unknown url type: '### I AM A BUG ##'
'http://www.taobao.com' page is 143891 bytes
'https://www.python.org/' page is 49114 bytes
如果执行ThreadPoolExecutor
的map
方法,结果是按照URLS列表元素的顺序返回的,而且用map方法写出的代码更加简洁直观。但是一旦程序有异常,会保存在结果的生成器中,会增加debug的困难,所以更推荐上面的方法。
from concurrent.futures import ThreadPoolExecutor
import urllib.request as ur
URLS = ['https://www.python.org/', 'http://www.taobao.com',
'http://www.baidu.com', 'http://www.cctv.com/']
def load_url(url):
with ur.urlopen(url, timeout=60) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with ThreadPoolExecutor(max_workers=3) as executor:
for url, data in zip(URLS, executor.map(load_url, URLS)):
print('%r page is %d bytes' % (url, len(data))) # 如果URLS 中有 '### I AM A BUG ###' 则会报错
# 输出:
'https://www.python.org/' page is 49255 bytes
'http://www.taobao.com' page is 143891 bytes
'http://www.baidu.com' page is 153380 bytes
'http://www.cctv.com/' page is 298099 bytes
concurrent.futures
模块中常用的方法有wait
, as_completed
,用于处理Executors
返回的Future
对象。
concurrent.futures.as_completed(fs, timeout=None)
将Future
对象生成一个迭代器返回,并且先返回先执行完的结果(map
会按照我们传入的可迭代对象中的顺序返回)。
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
sleep_time = randint(0, 5)
sleep(sleep_time)
return "Return of {}, sleep_time: {}".format(num, sleep_time)
pool = ThreadPoolExecutor(6)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
for x in as_completed(futures):
print(x.result())
# 输出:
Return of 4, sleep_time: 0
Return of 0, sleep_time: 1
Return of 3, sleep_time: 1
Return of 2, sleep_time: 2
Return of 1, sleep_time: 3
from concurrent.futures import ThreadPoolExecutor
from time import sleep
from random import randint
def return_after_5_secs(num):
sleep_time = randint(0, 5)
sleep(sleep_time)
return "Return of {}, sleep_time: {}".format(num, sleep_time)
pool = ThreadPoolExecutor(6)
re_ge = pool.map(return_after_5_secs, [x for x in range(5)])
for i in re_ge:
print(i)
# 输出:
Return of 0, sleep_time: 3
Return of 1, sleep_time: 4
Return of 2, sleep_time: 3
Return of 3, sleep_time: 4
Return of 4, sleep_time: 4
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
有更大的自由度,它将一个Future
列表作为参数,并阻塞程序执行,最终会根据return_when
等待Future
对象完成到某个状态才返回结果,return_when
可选参数有:FIRST_COMPLETED
, FIRST_EXCEPTION
和 ALL_COMPLETED
。
返回结果为一个包含两个集合的named tuple
,其中一个集合包含完成的,另一个为未完成的。
ALL_COMPLETED
,程序会阻塞直到线程池里面的所有任务都完成:from concurrent.futures import ThreadPoolExecutor, wait, as_completed
import time
from random import randint
def return_after_5_secs(num):
sleep_time = randint(0, 5)
time.sleep(sleep_time)
return "Return of {}, sleep_time: {}".format(num, sleep_time)
pool = ThreadPoolExecutor(6)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
t0 = time.time()
print(wait(futures))
t1 = time.time()
print(f"wait {t1-t0:{2}.{5}}")
for x in as_completed(futures):
print(x.result())
return_when='ALL_COMPLETED'
,not_done无元素,wait的时间较长
DoneAndNotDoneFutures(done={<Future at 0x10a8d8240 state=finished returned str>, <Future at 0x10a856c88 state=finished returned str>, <Future at 0x10a7b8eb8 state=finished returned str>, <Future at 0x10a7bd128 state=finished returned str>, <Future at 0x10a7b81d0 state=finished returned str>}, not_done=set())
wait 4.0018
Return of 2, sleep_time: 2
Return of 3, sleep_time: 1
Return of 1, sleep_time: 4
Return of 0, sleep_time: 4
Return of 4, sleep_time: 0
FIRST_COMPLETED
参数,程序并不会等到线程池里面所有的任务都完成:from concurrent.futures import ThreadPoolExecutor, wait, as_completed
import time
from random import randint
def return_after_5_secs(num):
sleep_time = randint(0, 5)
time.sleep(sleep_time)
return "Return of {}, sleep_time: {}".format(num, sleep_time)
pool = ThreadPoolExecutor(6)
futures = []
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
t0 = time.time()
print(wait(futures, return_when='FIRST_COMPLETED'))
t1 = time.time()
print(f"wait {t1-t0:{2}.{5}}")
for x in as_completed(futures):
print(x.result())
return_when='FIRST_COMPLETED'
,not_done有元素,wait的时间很短
DoneAndNotDoneFutures(done={<Future at 0x10a8d8978 state=finished returned str>}, not_done={<Future at 0x10a856e80 state=running>, <Future at 0x10a8d8128 state=running>, <Future at 0x10a7bd048 state=running>, <Future at 0x10a8d92e8 state=running>})
wait 0.00015497
Return of 3, sleep_time: 0
Return of 4, sleep_time: 1
Return of 0, sleep_time: 2
Return of 2, sleep_time: 2
Return of 1, sleep_time: 4