我从一个kafka源读取记录到mydataframe
spark dataframe。我想从row
中选择一些列并执行一些操作。因此,为了检查我是否获得了正确的索引,我尝试在语句println(row.getFieldIndex(pathtoDesiredColumnFromSchema))
中打印索引,如下所示:
val pathtoDesiredColumnFromSchema = "data.root.column1.column2.field"
val myQuery = mydataframe.writeStream.foreach(new ForeachWriter[Row]() {
override def open(partitionId: Long, version: Long): Boolean = true
override def process(row: Row): Unit = {
println(row.getFieldIndex(pathtoDesiredColumnFromSchema))
}
override def close(errorOrNull: Throwable): Unit = {}
}).outputMode("append").start()
但是上面的代码表明该行只有一个名称data
,并且没有列名data.root.column1.column2.field
。
通过名称路径从spark sql行获取列值的正确方法是什么?
发布于 2018-08-02 07:32:48
您可以对struct
类型使用getAs
调用链,例如:
val df = spark.range(1,5).toDF.withColumn("time", current_timestamp())
.union(spark.range(5,10).toDF.withColumn("time", current_timestamp()))
.groupBy(window($"time", "1 millisecond")).count
df.printSchema
root
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- count: long (nullable = false)
df.take(1).head
.getAs[org.apache.spark.sql.Row]("window")
.getAs[java.sql.Timestamp]("start")
希望它能帮上忙!
发布于 2018-08-02 05:29:43
如果您只想打印DataFrame
的字段,您可以使用
mydataframe.select(pathtoDesiredColumnFromSchema).foreach(println(_.get(0)))
https://stackoverflow.com/questions/51639177
复制相似问题