完全新鲜的Scala和Spark!得到了一系列k-v数组形式的(String, (Double, Double, Int))
数据。现在对这些数据使用groupByKey()
方法,这样就可以得到几个(String, Seq[(Double, Double, Int)])
组。我如何进入第一个大组,然后通过Seq部分,然后移动到下一个大组?我说,也许我会得到
("id1", [(1.1,2.2,3), (4.4,5.5,6), (7.7,8.8,9)]),
("id2", [(10.10,11.11,12), (13.13,14.14,15)])
在我的记忆里。我将深入研究"id1“以迭代地查看这三个数据,甚至进行一些更改。然后我跳到“id2”中,也迭代数据。如何编写代码?>_<
发布于 2017-01-08 18:04:16
如果它是(String, Seq[(Double, Double, Int)])
的RDD,那么您可以使用标准映射对其进行迭代。
val data: RDD[(String, Seq[(Double, Double, Int)])] = _ //Your RDD Here
data.map {
case (key, value) =>
value.map {
case (first, second, third) => first * second * third
}
}
我会考虑使用Dataframe
或以其他方式结构化数据,因为这可能是一种相当笨拙的数据结构化方式。
你可以在这里找到一些关于数据帧/数据集的信息,http://spark.apache.org/docs/latest/sql-programming-guide.html可能更适合你的问题,如果你不习惯使用它们,它会让你编写更多类似SQL的语句,而不是映射。
下面是一个完整的、有点下流的例子
import org.apache.spark._
object SparkExample extends App {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("App")
val sess: SparkContext = new SparkContext(conf)
val data: Seq[(String, Seq[(Double, Double, Int)])] = Seq[(String, Seq[(Double, Double, Int)])](
("id1", Seq[(Double, Double, Int)]((1.1, 2.2, 3), (4.4, 5.5, 6), (7.7, 8.8, 9))),
("id2", Seq[(Double, Double, Int)]((10.10, 11.11, 12), (13.13, 14.14, 15)))
)
val rdd: RDD[(String, Seq[(Double, Double, Int)])] = sess.parallelize(data)
val d: Array[Seq[Double]] = rdd.map {
case (key, value) => value.map {
case (first, second, third) => first + second + third
}
}.collect()
println(d.mkString(", "))
}
https://stackoverflow.com/questions/41535742
复制相似问题