我使用pyspark来处理我的数据,最后我需要使用rdd.collect()从rdd中收集数据。然而,由于记忆问题,我的spark崩溃了。我试过很多方法,但都没成功。我现在运行以下代码,为每个分区处理一小块数据:
def make_part_filter(index):
def part_filter(split_index, iterator):
if split_index == index:
for el in iterator:
yield el
return part_filter
for part_id in range(rdd.getNumPartitions()):
part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
myCollection = part_rdd.collect()
for row in myCollection:
#Do something with each row我目前使用的新代码不会崩溃,但似乎永远在运行。
有没有更好的方法从大型rdd中收集数据?
发布于 2016-05-22 07:21:10
试图“收集”一个巨大的RDD是有问题的。"Collect“返回一个列表,这意味着整个RDD内容必须存储在驱动程序的内存中。这是一个"showstopper“问题。通常,人们希望Spark应用程序能够处理其大小远远超出单个节点内存容量的数据集。
让我们假设RDD几乎不能放入内存中,并且"collect“工作。然后我们又有了另一个“阻塞器”-低性能。在您的代码中,收集的RDD在一个循环中进行处理:"for row In myCollection“。这个循环只由一个核心执行。因此,与其通过RDD处理数据不同,RDD的计算分布在集群的所有核心中,其中可能有100个,如果不是1000个的话-相反,整个数据集上的所有工作都放在单个核心的背面。
https://stackoverflow.com/questions/37368635
复制相似问题