我有一个更大的数据框架,有100多列,并且一组列具有相同的名称和唯一的编号。基于此唯一编号创建多个较小的数据帧。
是的,列名具有相同的模式,这样的组的数量有时可能是64,有时可能是128。net1、net2、net3...net64...net128
我需要64个子or或128个子or。我不能将startswith使用,因为列名net1、net10、net11...net100、net101...could匹配
我已经用Spark+Scala创建了一个解决方案,它工作得很好,但我觉得必须有一种更简单的方法来动态实现它
df_net.printSchema()
|-- net1: string (nullable = true)
|-- net1_a: integer (nullable = true)
|-- net1_b: integer (nullable = true)
|-- net1_c: integer (nullable = true)
|-- net1_d: integer (nullable = true)
|-- net1_e: integer (nullable = true)
|-- net2: string (nullable = true)
|-- net2_a: integer (nullable = true)
|-- net2_b: integer (nullable = true)
|-- net2_c: integer (nullable = true)
|-- net2_d: integer (nullable = true)
|-- net2_e: integer (nullable = true)
|-- net3: string (nullable = true)
|-- net3_a: integer (nullable = true)
|-- net3_b: integer (nullable = true)
|-- net3_c: integer (nullable = true)
|-- net3_d: integer (nullable = true)
|-- net3_e: integer (nullable = true)
|-- net4: string (nullable = true)
|-- net4_a: integer (nullable = true)
|-- net4_b: integer (nullable = true)
|-- net4_c: integer (nullable = true)
|-- net4_d: integer (nullable = true)
|-- net4_e: integer (nullable = true)
|-- net5: string (nullable = true)
|-- net5_a: integer (nullable = true)
|-- net5_b: integer (nullable = true)
|-- net5_c: integer (nullable = true)
|-- net5_d: integer (nullable = true)
|-- net5_e: integer (nullable = true)
val df_net1 = df_net
.filter(!($"net1".isNull))
.select("net1","net1_a","net1_b","net1_c","net1_d","net1_e")
val df_net2 = df_net
.filter(!($"net2".isNull))
.select("net2","net2_a","net2_b","net2_c","net2_d","net2_e")
val df_net3 = df_net
.filter(!($"net3".isNull))
.select("net3","net3_a","net3_b","net3_c","net3_d","net3_e")
根据唯一编号过滤较小的数据帧
发布于 2019-07-27 03:38:35
假设您的DF可预测地拆分为6列的组,下面将生成一个Iterator[Dataset]
,其中每个元素包含来自父数据集的6列:
scala> df.printSchema
root
|-- net1: string (nullable = false)
|-- net1_a: integer (nullable = false)
|-- net1_b: integer (nullable = false)
|-- net1_c: integer (nullable = false)
|-- net1_d: integer (nullable = false)
|-- net1_e: integer (nullable = false)
|-- net2: string (nullable = false)
|-- net2_a: integer (nullable = false)
|-- net2_b: integer (nullable = false)
|-- net2_c: integer (nullable = false)
|-- net2_d: integer (nullable = false)
|-- net2_e: integer (nullable = false)
|-- net3: string (nullable = false)
|-- net3_a: integer (nullable = false)
|-- net3_b: integer (nullable = false)
|-- net3_c: integer (nullable = false)
|-- net3_d: integer (nullable = false)
|-- net3_e: integer (nullable = false)
|-- net4: string (nullable = false)
|-- net4_a: integer (nullable = false)
|-- net4_b: integer (nullable = false)
|-- net4_c: integer (nullable = false)
|-- net4_d: integer (nullable = false)
|-- net4_e: integer (nullable = false)
|-- net5: string (nullable = false)
|-- net5_a: integer (nullable = false)
|-- net5_b: integer (nullable = false)
|-- net5_c: integer (nullable = false)
|-- net5_d: integer (nullable = false)
|-- net5_e: integer (nullable = false)
scala> val sub_dfs = df.schema.map(_.name).grouped(6).map{fields => df.select(fields.map(col): _*).where(col(fields.head).isNotNull)}
sub_dfs: Iterator[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = non-empty iterator
scala> sub_dfs.foreach{_.printSchema}
root
|-- net1: string (nullable = false)
|-- net1_a: integer (nullable = false)
|-- net1_b: integer (nullable = false)
|-- net1_c: integer (nullable = false)
|-- net1_d: integer (nullable = false)
|-- net1_e: integer (nullable = false)
root
|-- net2: string (nullable = false)
|-- net2_a: integer (nullable = false)
|-- net2_b: integer (nullable = false)
|-- net2_c: integer (nullable = false)
|-- net2_d: integer (nullable = false)
|-- net2_e: integer (nullable = false)
root
|-- net3: string (nullable = false)
|-- net3_a: integer (nullable = false)
|-- net3_b: integer (nullable = false)
|-- net3_c: integer (nullable = false)
|-- net3_d: integer (nullable = false)
|-- net3_e: integer (nullable = false)
root
|-- net4: string (nullable = false)
|-- net4_a: integer (nullable = false)
|-- net4_b: integer (nullable = false)
|-- net4_c: integer (nullable = false)
|-- net4_d: integer (nullable = false)
|-- net4_e: integer (nullable = false)
root
|-- net5: string (nullable = false)
|-- net5_a: integer (nullable = false)
|-- net5_b: integer (nullable = false)
|-- net5_c: integer (nullable = false)
|-- net5_d: integer (nullable = false)
|-- net5_e: integer (nullable = false)
发布于 2019-07-27 04:07:41
数据帧中的列似乎有一些模式,因为它们以一些公共字符串开头,如果这不会改变的话。您可以使用类似于下面的内容。
val df_net1 = df.select(df.columns.filter(a => a.startsWith("net1")).map(a =>
df(a)) : _*)
val df_net2 = df.select(df.columns.filter(a => a.startsWith("net2")).map(a =>
df(a)) : _*)
val df_net3 = df.select(df.columns.filter(a => a.startsWith("net3")).map(a =>
df(a)) : _*)
发布于 2019-07-27 05:10:17
我会将你的不同的网域组合成一组net_type
字段。然后,您可以进行分区写入,这将允许您根据需要轻松地加载单个集或多个集。
这会给你带来几个好处:
net_type
上过滤的值自动决定为您加载哪些文件所有输出文件将按Spark单次写入,而不是每个组一个下面是执行此操作的代码:
import org.apache.spark.sql.functions._
case class Net(net1:Integer,
net1_a:Integer,
net1_b:Integer,
net2:Integer,
net2_a:Integer,
net2_b:Integer)
val df = Seq(
Net(1, 1, 1, null, null, null),
Net(2, 2, 2, null, null, null),
Net(null, null, null, 3, 3, 3)
).toDS
// You could find these automatically if you wanted
val columns = Seq("net1", "net2")
// Turn each group of fields into a struct with a populated "net_type" field
val structColumns = columns.map(c =>
when(col(c).isNotNull,
struct(
lit(c) as "net_type",
col(c) as "net",
col(c + "_a") as "net_a",
col(c + "_b") as "net_b"
)
)
)
// Put into one column the populated group for each row
val df2 = df.select(coalesce(structColumns:_*) as "net")
// Flatten back down to top level fields instead of being in a struct
val df3 = df2.selectExpr("net.*")
df.write.partitionBy("net_type").parquet("/some/file/path.parquet")
这将为您提供如下所示的行:
scala> df3.show
+--------+---+-----+-----+
|net_type|net|net_a|net_b|
+--------+---+-----+-----+
| net1| 1| 1| 1|
| net1| 2| 2| 2|
| net2| 3| 3| 3|
+--------+---+-----+-----+
文件系统上的文件如下所示:
/some/file/path.parquet/
net_type=net1/
part1.parquet
..
net_type=net2/
part1.parquet
..
https://stackoverflow.com/questions/57225324
复制相似问题