我在Spark 1.4.0中开始使用Spark SQL和DataFrames。我想用Scala在DataFrames上定义一个自定义的分区程序,但是不知道怎么做。
我正在使用的一个数据表包含一个按帐户划分的事务列表,类似于下面的示例。
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
至少在开始时,大多数计算将在帐户内的事务之间进行。因此,我希望对数据进行分区,以便帐户的所有事务都在相同的Spark分区中。
但我看不出有什么办法来定义它。DataFrame类有一个名为'repartition(Int)‘的方法,您可以在其中指定要创建的分区数量。但是我没有看到任何可用于为DataFrame定义自定义分区程序的方法,例如可以为RDD指定的方法。
源数据存储在Parquet中。我确实看到,在向DataFrame写入帐户时,您可以指定一个用于分区的列,因此我可以告诉Parquet按“Account”列对其数据进行分区。但是可能有数百万个帐号,如果我没理解错的话,它会为每个帐号创建一个不同的目录,所以这听起来不是一个合理的解决方案。
有没有办法让Spark对这个DataFrame进行分区,以便一个帐户的所有数据都在同一个分区中?
发布于 2015-09-30 04:26:50
所以从某种答案开始:)-你不能
我不是专家,但就我对DataFrames的理解而言,它们并不等同于rdd,而且DataFrame也没有分割器这种东西。
一般来说,DataFrame的想法是提供另一层抽象来处理这类问题。DataFrame上的查询被转换为逻辑计划,该逻辑计划被进一步转换为对RDDs的操作。您建议的分区可能会自动应用,或者至少应该自动应用。
如果你不相信SparkSQL会提供某种最优的工作,你可以按照评论中的建议将DataFrame转换为RDDRow。
发布于 2015-08-06 16:42:52
使用由返回的DataFrame:
yourDF.orderBy(account)
没有明确的方式在DataFrame上使用partitionBy
,只能在PairRDD上使用,但是当你对DataFrame排序时,它会在它的LogicalPlan中使用它,当你需要对每个帐户进行计算时,这将会有所帮助。
我只是碰巧遇到了同样的问题,我想要按帐户对数据帧进行分区。我假设,当您说“希望对数据进行分区,以便一个帐户的所有事务都在同一个Spark分区中”时,您希望它具有可扩展性和性能,但您的代码并不依赖于它(就像使用mapPartitions()
等),对吧?
发布于 2015-10-03 01:13:18
我能够使用RDD做到这一点。但我不知道这对你来说是不是一个可以接受的解决方案。一旦有了可用的DF作为RDD,就可以应用repartitionAndSortWithinPartitions
来执行自定义的数据重新分区。
下面是我使用的一个示例:
class DatePartitioner(partitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
val start_time: Long = key.asInstanceOf[Long]
Objects.hash(Array(start_time)) % partitions
}
override def numPartitions: Int = partitions
}
myRDD
.repartitionAndSortWithinPartitions(new DatePartitioner(24))
.map { v => v._2 }
.toDF()
.write.mode(SaveMode.Overwrite)
https://stackoverflow.com/questions/30995699
复制相似问题