首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何定义DataFrame的分区?

如何定义DataFrame的分区?
EN

Stack Overflow用户
提问于 2015-06-23 14:48:23
回答 3查看 186.6K关注 0票数 141

我在Spark 1.4.0中开始使用Spark SQL和DataFrames。我想用Scala在DataFrames上定义一个自定义的分区程序,但是不知道怎么做。

我正在使用的一个数据表包含一个按帐户划分的事务列表,类似于下面的示例。

代码语言:javascript
复制
Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

至少在开始时,大多数计算将在帐户内的事务之间进行。因此,我希望对数据进行分区,以便帐户的所有事务都在相同的Spark分区中。

但我看不出有什么办法来定义它。DataFrame类有一个名为'repartition(Int)‘的方法,您可以在其中指定要创建的分区数量。但是我没有看到任何可用于为DataFrame定义自定义分区程序的方法,例如可以为RDD指定的方法。

源数据存储在Parquet中。我确实看到,在向DataFrame写入帐户时,您可以指定一个用于分区的列,因此我可以告诉Parquet按“Account”列对其数据进行分区。但是可能有数百万个帐号,如果我没理解错的话,它会为每个帐号创建一个不同的目录,所以这听起来不是一个合理的解决方案。

有没有办法让Spark对这个DataFrame进行分区,以便一个帐户的所有数据都在同一个分区中?

EN

回答 3

Stack Overflow用户

发布于 2015-09-30 04:26:50

所以从某种答案开始:)-你不能

我不是专家,但就我对DataFrames的理解而言,它们并不等同于rdd,而且DataFrame也没有分割器这种东西。

一般来说,DataFrame的想法是提供另一层抽象来处理这类问题。DataFrame上的查询被转换为逻辑计划,该逻辑计划被进一步转换为对RDDs的操作。您建议的分区可能会自动应用,或者至少应该自动应用。

如果你不相信SparkSQL会提供某种最优的工作,你可以按照评论中的建议将DataFrame转换为RDDRow。

票数 8
EN

Stack Overflow用户

发布于 2015-08-06 16:42:52

使用由返回的DataFrame:

代码语言:javascript
复制
yourDF.orderBy(account)

没有明确的方式在DataFrame上使用partitionBy,只能在PairRDD上使用,但是当你对DataFrame排序时,它会在它的LogicalPlan中使用它,当你需要对每个帐户进行计算时,这将会有所帮助。

我只是碰巧遇到了同样的问题,我想要按帐户对数据帧进行分区。我假设,当您说“希望对数据进行分区,以便一个帐户的所有事务都在同一个Spark分区中”时,您希望它具有可扩展性和性能,但您的代码并不依赖于它(就像使用mapPartitions()等),对吧?

票数 7
EN

Stack Overflow用户

发布于 2015-10-03 01:13:18

我能够使用RDD做到这一点。但我不知道这对你来说是不是一个可以接受的解决方案。一旦有了可用的DF作为RDD,就可以应用repartitionAndSortWithinPartitions来执行自定义的数据重新分区。

下面是我使用的一个示例:

代码语言:javascript
复制
class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)
票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30995699

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档