我想把数据的每一行的输入和retun输出作为一个映射。
def getV(str:String,colValues:Map[String,Any]):Any={
var s= colValues.get("str") match {
case Some(value) =>value
case None =>None
}
return s
}
def setV(key:String,value:String,colValues:scala.collection.mutable.Map[String,Any]):Unit={
colValues(key)=value
}
def returnNotEmptyCols_map(inputRow: Row): collection.mutable.Map[String,Any] = {
implicit val formats = Serialization.formats(NoTypeHints)
var colValues = inputRow.getValuesMap[Any](inputRow.schema.fieldNames)
var mutMap = collection.mutable.Map(colValues.toSeq: _*)
//Write my operations here------
//print(getV("number",colValues))
//setV("hey","there",mutMap)
mutMap
//return df
}
然而,我面临的问题是,我使用udf从dataframe调用这个函数,因此结果输出是列格式的。
spark.udf.register("myFilterFunction", returnNotEmptyCols _)
import spark.implicits._
val df = Seq(
(12, "bat"),
(13, "mouse"),
(14, "horse")
).toDF("number", "word")
var dr=df.rdd.collect
val newDF=df.withColumn("newcl",callUDF("myFilterFunction",struct(df.columns.map(df(_)) : _*)))
有什么方法可以将行作为输入传递给methood returnNotEmptyCols_map函数,并将oupput作为映射(而不是数据格式的列)。我们可以使用下面的链接function to each row of Spark Dataframe来实现这一点
但它也是一个udf,结果输出为列格式。
发布于 2022-02-10 08:02:09
UDF的目的是继续使用DataFrame。如果不需要DataFrame,可以使用函数作为参数在底层的rdd上运行映射,并获得函数返回类型的rdd。
val rdd = df.rdd.map(returnNotEmptyCols_map)
它不适用于Dataset.map,因为Any
然后你可以做rdd.collect
什么的
https://stackoverflow.com/questions/71059890
复制相似问题