我有以下df:
KSCHL01 VTEXT01 KWERT01 KSCHL02 VTEXT02 KWERT02 KSCHL03 VTEXT03 KWERT03 id
ZBTB Tarif de base 4455.00 ZBFA Brut facturé 4455.00 ZBN Brut Négocié 3645.00 1
ZBT Brut Tarif. 222.75 ZFIF Remises fin d'ordre 0.00 ZMAJ Majorations 0.00 2我可能有超过13列。我想要将每3列切片转换为一行,以获得预期的输出:
id KSCHL VTEXT KWERT
1 ZBTB Tarif de base 4455.00
1 ZBFA Brut facturé 4455.00
1 ZBN Brut Négocié 3645.00
2 ZBT Brut Tarif. 222.75
2 ZFIF Remises fin d'ordre 0.00
2 ZMAJ Majorations 0.00我这样做了:
for( i <- 0 to df.columns.length-4 by 3){
var temp=df.select(df.columns.slice(i, i+3).map(col(_)): _*)
val columns = temp.columns
val regex = """[0-9]"""
val replacingColumns = columns.map(regex.r.replaceAllIn(_, "")) # delete all digits in column names
val resultDF = replacingColumns.zip(columns).foldLeft(temp){(tempdf, name) => tempdf.withColumnRenamed(name._2, name._1)}
res=res.union(resultDF) # Append df to final DF
}这给了我这样的结论:
KSCHL VTEXT KWERT
ZBTB Tarif de base 4455.00
ZBFA Brut facturé 4455.00
ZBN Brut Négocié 3645.00
ZBT Brut Tarif. 222.75
ZFIF Remises fin d'ordre 0.00
ZMAJ Majorations 0.00如何将id列添加到每个片中,以便将其作为列,就像在期望的输出中一样?我试过了:
temp = temp.withColumn("id", df.id)但我有这样的错误:
error: value id in class Dataset cannot be accessed in org.apache.spark.sql.DataFrame谢谢。
发布于 2021-06-28 20:10:10
下面是如何重写代码,根据列数动态调整范围
val range = (1 to 3).map(r => if (r < 10) s"0$r" else s"$r")
val structQuery = $"id" +: range.map(n =>
struct($"KSCHL$n".as("KSCHL"), $"VTEXT$n".as("VTEXT"), $"KWERT$n".as("KWERT")).as(s"struct$n")
)
df.select(structQuery: _*)
.withColumn("new", explode(array(range.map(r => col(s"struct$r")): _*)))
.select("id", "new.*")
.show(false)输出:
+---+-----+-------------------+------+
|id |KSCHL|VTEXT |KWERT |
+---+-----+-------------------+------+
|1 |ZBTB |Tarif de base |4455.0|
|1 |ZBFA |Brut facturé |4455.0|
|1 |ZBN |Brut Négocié |3645.0|
|2 |ZBT |Brut Tarif. |222.75|
|2 |ZFIF |Remises fin d'ordre|0.0 |
|2 |ZMAJ |Majorations |0.0 |
+---+-----+-------------------+------+发布于 2021-06-28 19:40:06
检查下面的代码。
scala> df.show(false)
+-------+-------------+-------+-------+-------------------+-------+-------+------------+-------+---+--------------+
|KSCHL01|VTEXT01 |KWERT01|KSCHL02|VTEXT02 |KWERT02|KSCHL03|VTEXT03 |KWERT03|id |KSCHL04 |
+-------+-------------+-------+-------+-------------------+-------+-------+------------+-------+---+--------------+
|ZBTB |Tarif de base|4455.00|ZBFA |Brut facturé |4455.00|ZBN |Brut Négocié|3645.00|1 |sample KSCHL03|
|ZBT |Brut Tarif. |222.75 |ZFIF |Remises fin d'ordre|0.00 |ZMAJ |Majorations |0.00 |2 |sample KSCHL03|
+-------+-------------+-------+-------+-------------------+-------+-------+------------+-------+---+--------------+scala> val singleColumns = df.columns.filter(c => c.filter(_.isDigit).length == 0).map(col)
singleColumns: Array[org.apache.spark.sql.Column] = Array(id)scala> val multipleColumns = df.columns.filter(c => c.filter(_.isDigit).length != 0).map(c => (c.filterNot(_.isDigit),c,c.filter(_.isDigit)))
multipleColumns: Array[(String, String, String)] = Array((KSCHL,KSCHL01,01), (VTEXT,VTEXT01,01), (KWERT,KWERT01,01), (KSCHL,KSCHL02,02), (VTEXT,VTEXT02,02), (KWERT,KWERT02,02), (KSCHL,KSCHL03,03), (VTEXT,VTEXT03,03), (KWERT,KWERT03,03), (KSCHL,KSCHL04,04))scala> val distinctColumns = multipleColumns.map(_._1).distinct
distinctColumns: Array[String] = Array(KSCHL, VTEXT, KWERT)scala> :paste
// Entering paste mode (ctrl-D to finish)
val colExpr = array(
multipleColumns
.groupBy(_._3)
.map(k => struct(
k._2.map(c => col(c._2).as(c._1)) ++
distinctColumns.filter(c => k._2.filter(_._1 == c).length == 0).map(c => lit("").as(c)) ++
singleColumns:_*
).as("data"))
.toSeq:_*
).as("array_data")
// Exiting paste mode, now interpreting.
colExpr: org.apache.spark.sql.Column = array(named_struct(NamePlaceholder(), KSCHL03 AS `KSCHL`, NamePlaceholder(), VTEXT03 AS `VTEXT`, NamePlaceholder(), KWERT03 AS `KWERT`, NamePlaceholder(), id) AS `data`, named_struct(NamePlaceholder(), KSCHL02 AS `KSCHL`, NamePlaceholder(), VTEXT02 AS `VTEXT`, NamePlaceholder(), KWERT02 AS `KWERT`, NamePlaceholder(), id) AS `data`, named_struct(NamePlaceholder(), KSCHL01 AS `KSCHL`, NamePlaceholder(), VTEXT01 AS `VTEXT`, NamePlaceholder(), KWERT01 AS `KWERT`, NamePlaceholder(), id) AS `data`, named_struct(NamePlaceholder(), KSCHL04 AS `KSCHL`, VTEXT, AS `VTEXT`, KWERT, AS `KWERT`, NamePlaceholder(), id) AS `data`) AS `array_data`scala> :paste
// Entering paste mode (ctrl-D to finish)
val finalDF = df
.select(colExpr)
.withColumn("array_data",explode_outer($"array_data"))
.select("array_data.*")
// Exiting paste mode, now interpreting.finalDF.show(false)
+--------------+-------------------+-------+---+
|KSCHL |VTEXT |KWERT |id |
+--------------+-------------------+-------+---+
|ZBN |Brut Négocié |3645.00|1 |
|ZBFA |Brut facturé |4455.00|1 |
|ZBTB |Tarif de base |4455.00|1 |
|sample KSCHL03| | |1 |
|ZMAJ |Majorations |0.00 |2 |
|ZFIF |Remises fin d'ordre|0.00 |2 |
|ZBT |Brut Tarif. |222.75 |2 |
|sample KSCHL03| | |2 |
+--------------+-------------------+-------+---+https://stackoverflow.com/questions/68161131
复制相似问题