好的,让我首先描述一下我是如何创建Dataframe的,以及其中包含了什么内容。
我有一组gziped HTML文档和一组指向这些HTML文档的gziped元数据
对于这两种情况,我提供了一个指向RDDs的路径列表,如下所示:
Wet_Paths_RDD = sc.parallelize(Wet_Paths)
Wet_RDD = Wet_Paths_RDD.map(open_wet_filelist).flatMap(split_wetfiles)我以如下所示的方式准备两个RDDs:
(k,(some,other,values))然后,我将元数据RDD与content RDD连接在一起,如下所示:
Wat_Wet_RDD = Wat_RDD.join(Wet_RDD)然后我解开目前相对复杂的元组,并进行语言检测等操作。我必须连接RDDs,因为到目前为止,我所有的字符串都表示为byte strings,不能在Dataframe中表示。
Wat_Wet_RDD = Wat_Wet_RDD.map(unpack_wat_wet_tuple_and_decoding_and_langdetect)然后,我将连接的RDD转换为Dataframe
wat_wet_schema = StructType([
StructField("URI", StringType(), True),
StructField("Links", StringType(), True),
StructField("N_Links", IntegerType(), True),
StructField("Content_type", StringType(), True),
StructField("Original_Encoding", StringType(), True),
StructField("Content", StringType(), True),
StructField("Language", StringType(), True),
StructField("Language_confidence", IntegerType(), True),
])
WatWet_DF = sqlContext.createDataFrame(Wat_Wet_RDD, schema=wat_wet_schema)并使用以下命令查看它:
print(WatWet_DF.show(20))到目前为止,一切都需要24分钟,但下一步:
print(WatWet_DF.groupBy(WatWet_DF.Language).count().orderBy(desc('count')).show(100))我在24小时后放弃了,在这个阶段没有解决任何任务。
目前,我在一台测试linux虚拟机上运行集群。VM有4个核心,同时运行Master和Worker。工人有4个执行器,每个执行器有3.5G的内存。Dataframe应该由大约一百万行组成。Apache Spark版本是2.1.0,使用的是python 3.5。虚拟机运行在过时的至强W3680 6(V12)内核之上,内存为24G。
发布于 2017-01-17 23:07:58
好了,我找到了为什么.count()和.groupBy()在这个数据集上比.show()花了这么长的时间。原因是,为了让.count()和.groupBy()提供所有结果,在这里Wat_Wet_RDD.map(unpack_wat_wet_tuple_and_decoding_and_langdetect)映射阶段执行的函数需要应用于整个数据集。为了让.show()提供结果,只需将这些函数应用于整个数据集的一个子集,就可以更快地提供结果。现在是映射阶段,Wat_Wet_RDD.map(unpack_wat_wet_tuple_and_decoding_and_langdetect)中有一些非常昂贵的函数,导致计算时间非常长,特别是在将.count()和.groupBy()与.show()进行比较时。
https://stackoverflow.com/questions/41611900
复制相似问题