首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >TypeError:无法pickle生成器对象:由于无法序列化生成器返回类型(dict_key),Spark collect()失败

TypeError:无法pickle生成器对象:由于无法序列化生成器返回类型(dict_key),Spark collect()失败
EN

Stack Overflow用户
提问于 2019-02-26 14:36:34
回答 1查看 2.3K关注 0票数 2

我有一个库函数,它返回一个包含生成器的复合对象,它不能被pickle (尝试pickle会生成错误TypeError: can't pickle dict_keys objects)。

当我尝试通过Spark并行化时,由于pickle失败(nb.通过具有默认sc的DataBricks运行)。

这是一个最小的重现:

代码语言:javascript
运行
复制
test_list = [{"a": 1, "b": 2, "c": 3}, 
             {"a": 7, "b": 3, "c": 5}, 
             {"a": 2, "b": 3, "c": 4}, 
             {"a": 9, "b": 8, "c": 7}]

parallel_test_list = sc.parallelize(test_list)

parallel_results = parallel_test_list.map(lambda x: x.keys())

local_results = parallel_results.collect()

我收到的堆栈跟踪很长,我认为相关部分是:

代码语言:javascript
运行
复制
Traceback (most recent call last):
      File "/databricks/spark/python/pyspark/worker.py", line 403, in main
        process()
      File "/databricks/spark/python/pyspark/worker.py", line 398, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/databricks/spark/python/pyspark/serializers.py", line 418, in dump_stream
        bytes = self.serializer.dumps(vs)
      File "/databricks/spark/python/pyspark/serializers.py", line 597, in dumps
        return pickle.dumps(obj, protocol)
    TypeError: can't pickle dict_keys objects

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:490)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:626)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:609)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-02-27 00:11:02

您可以编写一个递归助手函数来“消费”所有嵌套的生成器对象,并使用此函数map rdd中的所有行。

例如,下面是一个将嵌套生成器转换为list的函数:

代码语言:javascript
运行
复制
from inspect import isgenerator, isgeneratorfunction

def consume_all_generators(row):

    if isinstance(row, str):
        return row
    elif isinstance(row, dict):
        return {k: consume_all_generators(v) for k, v in row.items()}

    output = []
    try:
        for val in row:
            if isgenerator(val) or isgeneratorfunction(val):
                output.append(list(consume_all_generators(val)))
            else:
                output.append(consume_all_generators(val))
        return output
    except TypeError:
        return row

现在在collect之前调用map(consume_all_generators)

代码语言:javascript
运行
复制
local_results = parallel_results.map(consume_all_generators).collect()
print(local_results)
#[['a', 'c', 'b'], ['a', 'c', 'b'], ['a', 'c', 'b'], ['a', 'c', 'b']]
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54879691

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档