我在PySpark中运行了一些操作,最近在我的配置(在Amazon EMR上)中增加了节点数量。但是,即使我将节点数量增加了两倍(从4个增加到12个),性能似乎没有变化。因此,我想看看新节点对Spark是否可见。
我正在调用以下函数:
sc.defaultParallelism
>>>> 2
但我认为这是在告诉我分配给每个节点的任务总数,而不是Spark可以看到的节点总数。
如何查看PySpark在集群中使用的节点数?
发布于 2015-03-01 20:15:26
sc.defaultParallelism
只是一个提示。根据配置的不同,它可能与节点数量没有关系。如果您使用带有分区计数参数的操作,但没有提供该参数,则这是分区的数量。例如,sc.parallelize
将从列表中创建一个新的RDD。您可以用第二个参数告诉它要在RDD中创建多少个分区。但此参数的默认值为sc.defaultParallelism
。
您可以在Scala API中使用sc.getExecutorMemoryStatus
获取执行器的数量,但是在Python API中没有公开。
一般而言,建议RDD中的分区数量是执行器数量的4倍左右。这是一个很好的技巧,因为如果任务花费的时间有差异,这将使它变得平衡。例如,一些执行器将处理5个较快的任务,而另一些执行器将处理3个较慢的任务。
你不需要对此非常准确。如果你有一个大致的想法,你可以用一个估计值。比如,如果你知道你的CPU少于200个,你可以说500个分区就可以了。
因此,尝试使用以下数量的分区创建RDD:
rdd = sc.parallelize(data, 500) # If distributing local data.
rdd = sc.textFile('file.csv', 500) # If loading data from a file.
或者在计算之前重新划分RDD,如果您不控制RDD的创建:
rdd = rdd.repartition(500)
您可以使用rdd.getNumPartitions()
查看RDD中的分区数量。
发布于 2017-02-06 17:40:37
在pyspark上,你仍然可以使用pyspark的py4j桥来调用scala:
sc._jsc.sc().getExecutorMemoryStatus().size()
发布于 2018-01-05 18:14:59
应该可以使用此方法获取集群中的节点数量(类似于上面的@Dan的方法,但更短,效果更好!)。
sc._jsc.sc().getExecutorMemoryStatus().keySet().size()
https://stackoverflow.com/questions/28768642
复制相似问题