首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将大型数据帧拆分为多个较小的数据帧

将大型数据帧拆分为多个较小的数据帧
EN

Stack Overflow用户
提问于 2019-07-27 02:26:20
回答 3查看 106关注 0票数 1

我有一个更大的数据框架,有100多列,并且一组列具有相同的名称和唯一的编号。基于此唯一编号创建多个较小的数据帧。

是的,列名具有相同的模式,这样的组的数量有时可能是64,有时可能是128。net1、net2、net3...net64...net128

我需要64个子or或128个子or。我不能将startswith使用,因为列名net1、net10、net11...net100、net101...could匹配

我已经用Spark+Scala创建了一个解决方案,它工作得很好,但我觉得必须有一种更简单的方法来动态实现它

代码语言:javascript
运行
复制
df_net.printSchema()

|-- net1: string (nullable = true)
|-- net1_a: integer (nullable = true)
|-- net1_b: integer (nullable = true)
|-- net1_c: integer (nullable = true)
|-- net1_d: integer (nullable = true)
|-- net1_e: integer (nullable = true)
|-- net2: string (nullable = true)
|-- net2_a: integer (nullable = true)
|-- net2_b: integer (nullable = true)
|-- net2_c: integer (nullable = true)
|-- net2_d: integer (nullable = true)
|-- net2_e: integer (nullable = true)
|-- net3: string (nullable = true)
|-- net3_a: integer (nullable = true)
|-- net3_b: integer (nullable = true)
|-- net3_c: integer (nullable = true)
|-- net3_d: integer (nullable = true)
|-- net3_e: integer (nullable = true)
|-- net4: string (nullable = true)
|-- net4_a: integer (nullable = true)
|-- net4_b: integer (nullable = true)
|-- net4_c: integer (nullable = true)
|-- net4_d: integer (nullable = true)
|-- net4_e: integer (nullable = true)
|-- net5: string (nullable = true)
|-- net5_a: integer (nullable = true)
|-- net5_b: integer (nullable = true)
|-- net5_c: integer (nullable = true)
|-- net5_d: integer (nullable = true)
|-- net5_e: integer (nullable = true)
代码语言:javascript
运行
复制
val df_net1 = df_net
                        .filter(!($"net1".isNull))
.select("net1","net1_a","net1_b","net1_c","net1_d","net1_e")

val df_net2 = df_net
                        .filter(!($"net2".isNull))
                        .select("net2","net2_a","net2_b","net2_c","net2_d","net2_e")

val df_net3 = df_net
                        .filter(!($"net3".isNull))
                        .select("net3","net3_a","net3_b","net3_c","net3_d","net3_e")

根据唯一编号过滤较小的数据帧

EN

回答 3

Stack Overflow用户

发布于 2019-07-27 03:38:35

假设您的DF可预测地拆分为6列的组,下面将生成一个Iterator[Dataset],其中每个元素包含来自父数据集的6列:

代码语言:javascript
运行
复制
scala> df.printSchema
root
 |-- net1: string (nullable = false)
 |-- net1_a: integer (nullable = false)
 |-- net1_b: integer (nullable = false)
 |-- net1_c: integer (nullable = false)
 |-- net1_d: integer (nullable = false)
 |-- net1_e: integer (nullable = false)
 |-- net2: string (nullable = false)
 |-- net2_a: integer (nullable = false)
 |-- net2_b: integer (nullable = false)
 |-- net2_c: integer (nullable = false)
 |-- net2_d: integer (nullable = false)
 |-- net2_e: integer (nullable = false)
 |-- net3: string (nullable = false)
 |-- net3_a: integer (nullable = false)
 |-- net3_b: integer (nullable = false)
 |-- net3_c: integer (nullable = false)
 |-- net3_d: integer (nullable = false)
 |-- net3_e: integer (nullable = false)
 |-- net4: string (nullable = false)
 |-- net4_a: integer (nullable = false)
 |-- net4_b: integer (nullable = false)
 |-- net4_c: integer (nullable = false)
 |-- net4_d: integer (nullable = false)
 |-- net4_e: integer (nullable = false)
 |-- net5: string (nullable = false)
 |-- net5_a: integer (nullable = false)
 |-- net5_b: integer (nullable = false)
 |-- net5_c: integer (nullable = false)
 |-- net5_d: integer (nullable = false)
 |-- net5_e: integer (nullable = false)

scala> val sub_dfs = df.schema.map(_.name).grouped(6).map{fields => df.select(fields.map(col): _*).where(col(fields.head).isNotNull)}
sub_dfs: Iterator[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = non-empty iterator

scala> sub_dfs.foreach{_.printSchema}
root
 |-- net1: string (nullable = false)
 |-- net1_a: integer (nullable = false)
 |-- net1_b: integer (nullable = false)
 |-- net1_c: integer (nullable = false)
 |-- net1_d: integer (nullable = false)
 |-- net1_e: integer (nullable = false)

root
 |-- net2: string (nullable = false)
 |-- net2_a: integer (nullable = false)
 |-- net2_b: integer (nullable = false)
 |-- net2_c: integer (nullable = false)
 |-- net2_d: integer (nullable = false)
 |-- net2_e: integer (nullable = false)

root
 |-- net3: string (nullable = false)
 |-- net3_a: integer (nullable = false)
 |-- net3_b: integer (nullable = false)
 |-- net3_c: integer (nullable = false)
 |-- net3_d: integer (nullable = false)
 |-- net3_e: integer (nullable = false)

root
 |-- net4: string (nullable = false)
 |-- net4_a: integer (nullable = false)
 |-- net4_b: integer (nullable = false)
 |-- net4_c: integer (nullable = false)
 |-- net4_d: integer (nullable = false)
 |-- net4_e: integer (nullable = false)

root
 |-- net5: string (nullable = false)
 |-- net5_a: integer (nullable = false)
 |-- net5_b: integer (nullable = false)
 |-- net5_c: integer (nullable = false)
 |-- net5_d: integer (nullable = false)
 |-- net5_e: integer (nullable = false)
票数 2
EN

Stack Overflow用户

发布于 2019-07-27 04:07:41

数据帧中的列似乎有一些模式,因为它们以一些公共字符串开头,如果这不会改变的话。您可以使用类似于下面的内容。

代码语言:javascript
运行
复制
val df_net1 = df.select(df.columns.filter(a => a.startsWith("net1")).map(a => 
df(a)) : _*)

val df_net2 = df.select(df.columns.filter(a => a.startsWith("net2")).map(a => 
df(a)) : _*)

val df_net3 = df.select(df.columns.filter(a => a.startsWith("net3")).map(a => 
df(a)) : _*)
票数 1
EN

Stack Overflow用户

发布于 2019-07-27 05:10:17

我会将你的不同的网域组合成一组net_type字段。然后,您可以进行分区写入,这将允许您根据需要轻松地加载单个集或多个集。

这会给你带来几个好处:

  • 如果您需要进行聚合以按类型或其他方式计数,那么执行
  • 会很容易,您可以加载一个集合或任意数量的子集。
  • Spark将根据您使用
  • net_type上过滤的值自动决定为您加载哪些文件所有输出文件将按Spark单次写入,而不是每个组一个

下面是执行此操作的代码:

代码语言:javascript
运行
复制
import org.apache.spark.sql.functions._

case class Net(net1:Integer, 
               net1_a:Integer,
               net1_b:Integer,
               net2:Integer,
               net2_a:Integer,
               net2_b:Integer)

val df = Seq(
    Net(1, 1, 1, null, null, null),
    Net(2, 2, 2, null, null, null),
    Net(null, null, null, 3, 3, 3)
).toDS

// You could find these automatically if you wanted
val columns = Seq("net1", "net2")

// Turn each group of fields into a struct with a populated "net_type" field
val structColumns = columns.map(c => 
    when(col(c).isNotNull, 
        struct(
            lit(c) as "net_type",
            col(c) as "net",
            col(c + "_a") as "net_a",
            col(c + "_b") as "net_b"
        )
    )
)

// Put into one column the populated group for each row
val df2 = df.select(coalesce(structColumns:_*) as "net")

// Flatten back down to top level fields instead of being in a struct
val df3 = df2.selectExpr("net.*")

df.write.partitionBy("net_type").parquet("/some/file/path.parquet")

这将为您提供如下所示的行:

代码语言:javascript
运行
复制
scala> df3.show
+--------+---+-----+-----+
|net_type|net|net_a|net_b|
+--------+---+-----+-----+
|    net1|  1|    1|    1|
|    net1|  2|    2|    2|
|    net2|  3|    3|    3|
+--------+---+-----+-----+

文件系统上的文件如下所示:

代码语言:javascript
运行
复制
/some/file/path.parquet/
    net_type=net1/
        part1.parquet
        ..
    net_type=net2/
        part1.parquet
        ..
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57225324

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档