我希望concurrent.futures.ProcessPoolExecutor.map()
调用一个由2个或多个参数组成的函数。在下面的示例中,我使用了lambda
函数,并将ref
定义为与具有相同值的numberlist
大小相等的数组。
第一个问题:有更好的方法吗?在编号列表的大小可能是百万到十亿元素的情况下,引用的大小必须遵循编号列表,这种方法不必要地占用了宝贵的内存,这是我想避免的。我这样做是因为我读取map
函数将终止其映射,直到到达最短的数组结束为止。
import concurrent.futures as cf
nmax = 10
numberlist = range(nmax)
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
workers = 3
def _findmatch(listnumber, ref):
print('def _findmatch(listnumber, ref):')
x=''
listnumber=str(listnumber)
ref = str(ref)
print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
if ref in listnumber:
x = listnumber
print('x = {0}'.format(x))
return x
a = map(lambda x, y: _findmatch(x, y), numberlist, ref)
for n in a:
print(n)
if str(ref[0]) in n:
print('match')
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
#for n in executor.map(_findmatch, numberlist):
for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref):
print(type(n))
print(n)
if str(ref[0]) in n:
print('match')
运行上面的代码,我发现map
函数能够实现我想要的结果。但是,当我将相同的术语转移到concurrent.futures.ProcessPoolExecutor.map()时,python3.5失败了,出现了以下错误:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
obj = ForkingPickler.dumps(obj)
File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed
问题2:为什么会发生此错误,以及如何使concurrent.futures.ProcessPoolExecutor.map()调用具有多个参数的函数?
发布于 2017-02-05 20:33:25
要首先回答第二个问题,您将得到一个异常,因为像您使用的lambda
函数是不可选择的。由于Python使用pickle
协议来序列化主进程和ProcessPoolExecutor
的辅助进程之间传递的数据,这是一个问题。根本不清楚为什么要使用lambda
。您所使用的lambda有两个参数,就像原始函数一样。您可以直接使用_findmatch
而不是lambda
,它应该可以工作。
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(_findmatch, numberlist, ref):
...
至于第一个问题,关于传递第二个常量参数而不创建一个庞大的列表,您可以通过几种方法解决这个问题。一种方法可能是使用itertools.repeat
创建一个可迭代对象,在迭代时永远重复相同的值。
但是更好的方法可能是编写一个额外的函数,为您传递常量参数。(也许这就是您尝试使用lambda
函数的原因?)如果您使用的函数在模块的顶级命名空间中是可访问的,那么它应该可以工作:
def _helper(x):
return _findmatch(x, 5)
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(_helper, numberlist):
...
发布于 2017-02-05 20:34:51
(1)无须列明名单。您可以使用itertools.repeat
创建一个迭代器,该迭代器只重复某些值。
(2)您需要将一个命名函数传递给map
,因为它将传递给子进程执行。map
使用泡菜协议发送东西,lambdas不能被腌制,因此它们不能成为映射的一部分。但这完全没有必要。您的lambda所做的就是调用带有2个参数的2参数函数。把它完全移开。
工作代码是
import concurrent.futures as cf
import itertools
nmax = 10
numberlist = range(nmax)
workers = 3
def _findmatch(listnumber, ref):
print('def _findmatch(listnumber, ref):')
x=''
listnumber=str(listnumber)
ref = str(ref)
print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
if ref in listnumber:
x = listnumber
print('x = {0}'.format(x))
return x
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
#for n in executor.map(_findmatch, numberlist):
for n in executor.map(_findmatch, numberlist, itertools.repeat(5)):
print(type(n))
print(n)
#if str(ref[0]) in n:
# print('match')
发布于 2017-02-05 20:32:19
关于第一个问题,我是否正确地理解到,您希望传递一个参数,其值仅在调用map
时确定,而对于映射函数的所有实例都是常数?如果是这样的话,我将使用一个从带有第二个参数的“模板函数”派生的函数来执行map
(在您的示例中是ref
),它使用functools.partial
进行烘焙。
from functools import partial
refval = 5
def _findmatch(ref, listnumber): # arguments swapped
...
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(partial(_findmatch, refval), numberlist):
...
Re.问题2,第一部分:我还没有找到正确的代码片段,试图选择(序列化)然后并行执行的函数,但这听起来很自然--不仅是参数,而且函数也必须以某种方式传递给工作人员,而且为了实现这种传输,很可能必须序列化它。partial
函数可以在其他地方(例如:https://stackoverflow.com/a/19279016/6356764 )中提到,而lambda
的函数可以被腌制。
Re.问题2,第二部分:如果您想在ProcessPoolExecutor.map
中调用一个具有多个参数的函数,您可以将它作为第一个参数传递给它,然后为该函数传递一个可迭代的第一个参数,然后再传递它的第二个参数的可迭代性等等:
for n in executor.map(_findmatch, numberlist, ref):
...
https://stackoverflow.com/questions/42056738
复制相似问题