给定的
val as: RDD[(T, U)]
val bs: RDD[T]
我想对as
进行筛选,以查找具有bs
键的元素。
一种方法是
val intermediateAndOtherwiseUnnessaryPair = bs.map(b => b -> b)
bs.join(as).values
但是,bs
上的映射是不幸的。有没有更直接的方法?
发布于 2016-03-17 22:05:15
通过执行以下操作,可以减少映射的不必要性:
val intermediateAndOtherwiseUnnessaryPair = bs.map(b => (b, 1))
此外,加入前的共同分区也有很大帮助:
val intermediateAndOtherwiseUnnessaryPair = bs.map(b => (b, 1)).paritionBy(new HashPartitioner(NUM_PARTITIONS))
bs.paritionBy(new HashPartitioner(NUM_PARTITIONS)).join(as).values
Co分区的RDDs将不会在运行时被洗牌,因此您将看到显著的性能提升。
如果bs
太大(更准确地说,有大量的唯一值),广播可能无法工作,您也可能希望增加driver.maxResultsize
。
发布于 2016-03-17 20:48:45
使用第二个RDD
过滤一个RDD
的流行和通用方法只有两种(或者至少是我所知道的唯一一种):
1)您已经在做的join
--在本例中,我不太担心不必要的中间RDD
,不过,map()
是一个很窄的转换,不会引入那么多开销。然而,join()
本身很可能是缓慢的,因为它是一个广泛的转换(需要洗牌)
2)在驱动程序上收集bs
并使其成为广播变量,然后在as.filter()
中使用。
val collected = sc.broadcast(bs.collect().toSet)
as.filter(el => collected.value.contains(el))
您需要这样做,因为Spark不支持在调用RDDs
的方法中嵌套RDD
。
https://stackoverflow.com/questions/36076144
复制