我的记录是包含1000个字段的字符串,在dataframe中使用分隔符作为逗号,例如
"a,b,c,d,e.......upto 1000“-第1条记录"p,q,r,s,t ......upto 1000”-第2条记录
我使用的是来自stackoverflow的以下建议解决方案
Split 1 column into 3 columns in spark scala
df.withColumn("_tmp", split($"columnToSplit", "\\.")).select($"_tmp".getItem(0).as("col1"),$"_tmp".getItem(1).as("col2"),$"_tmp".getItem(2).as("col3")).drop("_tmp")
然而,在我的例子中,我有1000列,这些列在JSON模式中,我可以像这样检索
column_seq:Seq[Array]=Schema_func.map(_.name)
for(i <-o to column_seq.length-1){println(i+" " + column_seq(i))}
它返回的结果如下
0 col1 1 col2 2 col3 3 col4
现在,我需要将所有这些索引和列名传递给DataFrame的以下函数
df.withColumn("_tmp", split($"columnToSplit", "\\.")).select($"_tmp".getItem(0).as("col1"),$"_tmp".getItem(1).as("col2"),$"_tmp".getItem(2).as("col3")).drop("_tmp")
在……里面
$"_tmp".getItem(0).as("col1"),$"_tmp".getItem(1).as("col2"),
由于我不能创建包含1000列的long语句,有没有任何有效的方法可以将上面提到的json schema中的所有这些参数传递给select函数,这样我就可以拆分列,添加头,然后将DF转换为parquet。
发布于 2018-06-02 05:44:13
您可以构建一系列org.apache.spark.sql.Column
,其中每个列都是选择正确的项并具有正确的名称的结果,然后select
这些列:
val columns: Seq[Column] = Schema_func.map(_.name)
.zipWithIndex // attach index to names
.map { case (name, index) => $"_tmp".getItem(index) as name }
val result = df
.withColumn("_tmp", split($"columnToSplit", "\\."))
.select(columns: _*)
例如,对于此输入:
case class A(name: String)
val Schema_func = Seq(A("c1"), A("c2"), A("c3"), A("c4"), A("c5"))
val df = Seq("a.b.c.d.e").toDF("columnToSplit")
result
将为:
// +---+---+---+---+---+
// | c1| c2| c3| c4| c5|
// +---+---+---+---+---+
// | a| b| c| d| e|
// +---+---+---+---+---+
https://stackoverflow.com/questions/50651014
复制相似问题