我正在做一个将学生数据转换为时间间隔的小项目。程序只是读取数据,并从标记列中选择标记(整数),以便在按升序对它们进行排序后将它们转换为间隔。任何人都可以在这一部分帮助我,非常感谢:
代码:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
case class Rating(mark: Int, classes: String, schooles: String, name: String)
val Result = sc.textFile("hdfs://schools:9000/input/marks.csv").map(_.split(",")).map(p => Rating(p(0).toInt, p(1).trim, p(2).trim, p(3).trim)).toDF
val all_marks = Result.groupBy("classes", "schooles","name").agg(collect_list("mark") as "marks",count("*") as "cnt").where($"cnt" > 10)
val mrk=all_marks.select("marks")
我需要帮助的部分是:
mrk.foreach(
var ascending=mrk.sort
var interval=ascending[0]+"-"+ascending[ascending.size]
)
如何逐行读取标记,以便对它们进行排序,并将它们转换为间隔。
发布于 2017-07-17 12:54:51
您可以创建用户定义的函数来从列表中创建新字段作为间隔
下面是一个简单的示例,因为您已经计算了列标记
import org.apache.spark.sql.functions._
val ddf1 = Seq(List(2,3,1), List(6,4,3)).toDF("marks")
val testUdf = udf((list: Seq[Int]) => {
val ascending = list.sorted //sorts in ascending order
s"${ascending(0)} - ${ascending(ascending.size - 1)}"
})
ddf1.withColumn("marks", testUdf($"marks"))
输出:
+-----+
|marks|
+-----+
|1 - 3|
|3 - 6|
+-----+
希望这能有所帮助!
发布于 2017-07-17 13:28:40
可以使用下一种方法来获得这样的结果-将数据帧转换为列表类型的RDD,应用映射函数,然后将RDD转换回数据帧:
mrk.rdd.map(_.getList[Int](0).toList).map(l => s"${l.min} - ${l.max}").toDF("marks")
注意:getList[Int]
返回Java的utils.List
类型,要将其转换成Scala的列表,我们必须使用toList
方法并导入scala.collection.JavaConversions._
。
也可以使用DataSet API来代替RDD:
mrk.map(_.getList[Int](0).toList).map(l => s"${l.min} - ${l.max}").toDF("marks")
发布于 2017-07-17 19:44:28
我需要使用WrappedArray来让UDF正常工作,如下所示:
case class Rating(mark: Int, classes: String, schooles: String, name: String)
val Result = sc.parallelize(Seq(
Rating(56, "classA", "SchoolA", "English"),
Rating(57, "classB", "SchoolA", "English"),
Rating(58, "classA", "SchoolA", "English"),
Rating(59, "classB", "SchoolA", "English"),
Rating(60, "classA", "SchoolA", "English"),
Rating(61, "classA", "SchoolA", "English"))).toDF()
val toInterval = udf((marks: scala.collection.mutable.WrappedArray[Int]) => s"${marks.min}-${marks.max}")
val all_marks = Result.groupBy("classes", "schooles","name").agg(collect_list("mark") as "marks",count("*") as "cnt")
all_marks.select("marks").withColumn("interval", toInterval(col("marks"))).show()
输出:
+----------------+--------+
| marks|interval|
+----------------+--------+
|[56, 58, 60, 61]| 56-61|
| [57, 59]| 57-59|
+----------------+--------+
https://stackoverflow.com/questions/45144208
复制