我有一个库函数,它返回一个包含生成器的复合对象,它不能被pickle (尝试pickle会生成错误TypeError: can't pickle dict_keys objects
)。
当我尝试通过Spark并行化时,由于pickle失败(nb.通过具有默认sc
的DataBricks运行)。
这是一个最小的重现:
test_list = [{"a": 1, "b": 2, "c": 3},
{"a": 7, "b": 3, "c": 5},
{"a": 2, "b": 3, "c": 4},
{"a": 9, "b": 8, "c": 7}]
parallel_test_list = sc.parallelize(test_list)
parallel_results = parallel_test_list.map(lambda x: x.keys())
local_results = parallel_results.collect()
我收到的堆栈跟踪很长,我认为相关部分是:
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 403, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 398, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 418, in dump_stream
bytes = self.serializer.dumps(vs)
File "/databricks/spark/python/pyspark/serializers.py", line 597, in dumps
return pickle.dumps(obj, protocol)
TypeError: can't pickle dict_keys objects
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609)
发布于 2019-02-27 00:11:02
您可以编写一个递归助手函数来“消费”所有嵌套的生成器对象,并使用此函数map
rdd
中的所有行。
例如,下面是一个将嵌套生成器转换为list
的函数:
from inspect import isgenerator, isgeneratorfunction
def consume_all_generators(row):
if isinstance(row, str):
return row
elif isinstance(row, dict):
return {k: consume_all_generators(v) for k, v in row.items()}
output = []
try:
for val in row:
if isgenerator(val) or isgeneratorfunction(val):
output.append(list(consume_all_generators(val)))
else:
output.append(consume_all_generators(val))
return output
except TypeError:
return row
现在在collect
之前调用map(consume_all_generators)
local_results = parallel_results.map(consume_all_generators).collect()
print(local_results)
#[['a', 'c', 'b'], ['a', 'c', 'b'], ['a', 'c', 'b'], ['a', 'c', 'b']]
https://stackoverflow.com/questions/54879691
复制相似问题