在Spark中可以从scala集合中实现'.combinations‘函数吗?
/** Iterates over combinations.
*
* @return An Iterator which traverses the possible n-element combinations of this $coll.
* @example `"abbbc".combinations(2) = Iterator(ab, ac, bb, bc)`
*/例如,对于大小= 2的组合,如何从RDDX到RDD[ListX]或RDD(X,X),并假设RDD中的所有值都是唯一的。
发布于 2014-10-25 17:25:24
笛卡尔积和组合是两种不同的东西,笛卡尔积将创建一个大小为rdd.size() ^ 2的RDD,而组合将创建一个rdd.size() choose 2大小的RDD。
val rdd = sc.parallelize(1 to 5)
val combinations = rdd.cartesian(rdd).filter{ case (a,b) => a < b }`.
combinations.collect()注意,只有在列表元素上定义了排序时,这才能工作,因为我们使用的是<。此方法只适用于选择两个,但可以通过确保序列中所有a和b的关系a < b来轻松扩展。
发布于 2014-10-25 08:03:37
带有cartesian转换的Spark本机支持这一点。
例如:
val rdd = sc.parallelize(1 to 5)
val cartesian = rdd.cartesian(rdd)
cartesian.collect
Array[(Int, Int)] = Array((1,1), (1,2), (1,3), (1,4), (1,5),
(2,1), (2,2), (2,3), (2,4), (2,5),
(3,1), (3,2), (3,3), (3,4), (3,5),
(4,1), (4,2), (4,3), (4,4), (4,5),
(5,1), (5,2), (5,3), (5,4), (5,5))发布于 2014-10-25 20:06:17
如前所述,cartesian将为您提供RDD的笛卡儿积的n^2元素。该算法计算RDD的组合(n,2),而不必先计算n^2元素:(使用字符串作为类型,泛化到类型T需要一些带有classtag的管道,这会模糊这里的目的)
由于强制计算RDD的迭代count和take操作,这可能会降低笛卡尔+滤波的时间效率,但更节省空间,因为它只计算C(n,2) = n!/(2*(n-2))! = (n*(n-1)/2)元素而不是笛卡尔积的n^2。
import org.apache.spark.rdd._
def combs(rdd:RDD[String]):RDD[(String,String)] = {
val count = rdd.count
if (rdd.count < 2) {
sc.makeRDD[(String,String)](Seq.empty)
} else if (rdd.count == 2) {
val values = rdd.collect
sc.makeRDD[(String,String)](Seq((values(0), values(1))))
} else {
val elem = rdd.take(1)
val elemRdd = sc.makeRDD(elem)
val subtracted = rdd.subtract(elemRdd)
val comb = subtracted.map(e => (elem(0),e))
comb.union(combs(subtracted))
}
}https://stackoverflow.com/questions/26557873
复制相似问题