当RDD的键是或包含枚举时,PairRDD
上的一些操作不能正常工作。
例如,以下星图代码将花费两周的工作日时间,并按工作日计算:
import java.time.DayOfWeek
val weekdays: Seq[(DayOfWeek, Int)] = DayOfWeek.values().map(dow => (dow, 1))
val numPartitions = 2 * weekdays.size
val result = sc
.parallelize(weekdays ++ weekdays, numPartitions)
.reduceByKey(_ + _)
.collect
.toSeq
println(result)
在输出中,我希望每个工作日(例如,MONDAY
)都有计数2,但是在我的集群中,我得到:
WrappedArray(
(THURSDAY,1), (SATURDAY,1), (WEDNESDAY,2), (SATURDAY,1),
(MONDAY,2), (TUESDAY,2), (THURSDAY,1), (FRIDAY,2), (SUNDAY,2)
)
如果您在具有单个节点(或将numPartitions
设置为1)的集群上运行此操作,则结果是正确的(即所有计数为2)。
发布于 2017-05-02 12:56:00
Spark PairRDD
的操作(如aggregateByKey()
、reduceByKey()
、combineByKey()
)使用一个可选参数来指定Spark要使用的Partitioner
。如果不显式地指定分区,则使用Spark的HashPartitioner
,它调用一行的键的hashCode()
方法,并使用它将行分配给分区。然而,如果枚举的hashCode()
在相同的Java版本上运行,则不能保证在不同JVM进程- even上的hashCode()
相同。因此,Spark xyzByKey()
操作不能正常工作。
在上面的示例中,输入中有两对(THURSDAY, 1)
,每对都在不同的执行器上处理。该示例使用具有14个(= numPartitions
)分区的numPartitions
。因为(THURSDAY, 1).hashCode() % 14
对这两个执行器产生不同的结果,所以这两行被发送到不同的执行器以减少,从而导致不正确的结果。
Bottomline:不对哈希码在不同的JVM进程上不一致的对象使用HashPartitioner
。特别是,以下对象不能保证在不同JVM进程上生成相同的哈希代码:
enum
sealed trait
-based enum的:sealed trait TraitEnum
object TEA extends TraitEnum
object TEB extends TraitEnum
abstract class
-based enum的:sealed abstract class AbstractClassEnum
object ACA extends AbstractClassEnum
object ACB extends AbstractClassEnum
hashCode()
实现)。但是,Scala case class
-based enum具有一致的哈希代码,因此可以安全地使用:
sealed case class CaseClassEnum(…) # “…" must be a non-empty list of parameters
object CCA extends CaseClassEnum(…)
object CCB extends CaseClassEnum(…)
附加信息:
https://stackoverflow.com/questions/43747083
复制