我有一个Spark应用程序来执行一个大连接
val joined = uniqueDates.join(df, $"start_date" <= $"date" && $"date" <= $"end_date")
然后将产生的DataFrame聚合到一个有13k行的行。在联接过程中,作业失败,出现以下错误消息:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 78021 tasks is bigger than spark.driver.maxResultSize (2.0 GB)
这是在没有设置spark.driver.maxResultSize
的情况下发生的,所以我设置了spark.driver.maxResultSize=2G
。然后,我对join条件做了轻微的更改,错误再次出现。
编辑:在调整集群的大小时,我还将DataFrame在.coalesce(256)
中假设的分区数量增加了一倍,成为.coalesce(512)
,所以我不能确定这不是因为这一点。
我的问题是,既然我没有收集任何东西给司机,为什么spark.driver.maxResultSize
应该在这里重要?驱动程序的内存是否用于连接中我不知道的内容?
发布于 2018-09-16 09:47:14
仅仅因为你没有明确地收集任何东西,并不意味着什么都不收集。由于问题发生在连接过程中,最可能的解释是执行计划使用广播连接。在这种情况下,星火将首先收集数据,然后广播。
取决于配置和管道:
spark.sql.autoBroadcastJoinThreshold
小于spark.driver.maxResultSize
。若要确定广播是否确实是问题所在,请检查执行计划,如果需要,请删除广播提示并禁用自动广播:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
发布于 2018-09-18 15:30:22
理论上,异常并不总是与客户数据相关。
有关任务执行结果的技术信息以序列化的形式发送到驱动节点,这些信息可以占用更多的内存,而不是阈值。
org.apache.spark.scheduler.TaskSetManager#canFetchMoreResults中的错误消息
val msg = s"Total size of serialized results of ${calculatedTasks} tasks " +
方法在org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask中调用。
val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
case directResult: DirectTaskResult[_] =>
if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
return
}
如果任务数量很大,就会出现上述异常。
https://stackoverflow.com/questions/42774845
复制相似问题