我使用pyspark来处理我的数据,最后我需要使用rdd.collect()从rdd中收集数据。然而,由于记忆问题,我的spark崩溃了。我试过很多方法,但都没成功。我现在运行以下代码,为每个分区处理一小块数据:
def make_part_filter(index):
def part_filter(split_index, iterator):
if split_index == index:
for el in iterator:
yield el
return part_filter
for part_id in range(rdd.getNumPartitions()):
part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
myCollection = part_rdd.collect()
for row in myCollection:
#Do something with each row我目前使用的新代码不会崩溃,但似乎永远在运行。
有没有更好的方法从大型rdd中收集数据?
发布于 2016-05-22 07:21:10
试图“收集”一个巨大的RDD是有问题的。"Collect“返回一个列表,这意味着整个RDD内容必须存储在驱动程序的内存中。这是一个"showstopper“问题。通常,人们希望Spark应用程序能够处理其大小远远超出单个节点内存容量的数据集。
让我们假设RDD几乎不能放入内存中,并且"collect“工作。然后我们又有了另一个“阻塞器”-低性能。在您的代码中,收集的RDD在一个循环中进行处理:"for row In myCollection“。这个循环只由一个核心执行。因此,与其通过RDD处理数据不同,RDD的计算分布在集群的所有核心中,其中可能有100个,如果不是1000个的话-相反,整个数据集上的所有工作都放在单个核心的背面。
发布于 2017-04-18 08:45:01
我不知道这是不是最好的方法,但这是我试过的最好的方法。不知道它是比你的更好还是更差。同样的想法,将其分成块,但您可以更灵活地处理块大小。
def rdd_iterate(rdd, chunk_size=1000000):
indexed_rows = rdd.zipWithIndex().cache()
count = indexed_rows.count()
print("Will iterate through RDD of count {}".format(count))
start = 0
end = start + chunk_size
while start < count:
print("Grabbing new chunk: start = {}, end = {}".format(start, end))
chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect()
for row in chunk:
yield row[0]
start = end
end = start + chunk_size示例用法是,我想要将一个巨大的RDD附加到磁盘上的CSV文件中,而不是用整个RDD填充Python列表:
def rdd_to_csv(fname, rdd):
import csv
f = open(fname, "a")
c = csv.writer(f)
for row in rdd_iterate(rdd): # with abstraction, iterates through entire RDD
c.writerows([row])
f.close()
rdd_to_csv("~/test.csv", my_really_big_rdd)https://stackoverflow.com/questions/37368635
复制相似问题