所以我在Google DataProc中运行了一个5节点的集群,每个节点有16个核心。
假设在1000行上应用一个简单的函数在50秒内完成。
rows = df.limit(1000).collect()
[func(row) for row in rows] # runs ~50secs在我的假设中,如果我充分利用集群中的所有内核,那么我的运行时间将大致为:
total_cores = n_nodes * (n_core_per_node - 1)
total_cores = 5 * 15 = 75
50secs / 75cores = 0.667secs runtime across 1000 rows因此,我将df划分为75,这样每个executor中就有15个分区。由于每个执行程序都有15个核心,因此每个分区都会有自己的核心:
conf = spark.sparkContext._conf.setAll([
    ('spark.executor.cores', '15'), 
    ('spark.executor.instances', '5')
])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df.limit(1000).repartition(75).foreachPartition(func) # runs ~7secs但是我没有得到我所期望的结果(运行时间大约为0.667秒)。
我遗漏了什么?
提前感谢
发布于 2021-09-06 18:30:27
documentation声明了以下内容:
Spark属性主要可以分为两类:一类是与deploy相关的属性,如
spark.driver.memory、spark.executor.instances,这类属性在运行时通过SparkConf进行编程设置时可能不会受到影响,或者其行为取决于您选择的集群管理器和部署模式,因此建议通过配置文件或spark-submit命令行选项进行设置
您的代码似乎是在运行时设置配置。按照他们的建议,尝试通过配置文件或命令行设置它。
https://stackoverflow.com/questions/69078737
复制相似问题