我想从DStream中删除前n个RDDs。我尝试将以下函数与transform一起使用,但它不起作用(ERROR OneForOneStrategy: org.apache.spark.SparkContext java.io.NotSerializableException),而且我认为它不会实现我删除RDD的真正目标,因为它将返回空的RDD。
var num = 0
def dropNrdds(myRDD: RDD[(String, Int)], dropNum: Int) : RDD[(String, Int)] = {
if (num < dropNum) {
num = num + 1
return myRDD
}
else {
return sc.makeRDD(Seq())
}
}
发布于 2014-10-28 10:10:04
出现这个错误是因为您的函数引用了您的var num
,并且包含的类不是Serializable
。您的函数将由集群的不同节点调用,因此它所依赖的任何东西都必须是Serializable
的,并且您不能在函数的不同调用之间共享变量(因为它们可能在不同的集群节点上运行)。
要从DStream
中删除特定数量的RDD
似乎非常奇怪,因为特定DStream
的拆分方式很大程度上是一个实现细节。也许基于时间的slice
方法可以做你想做的事?
发布于 2016-08-08 21:09:22
你得到了错误,因为,我猜你是从
foreachRdd
循环,它实际上是在executers节点上执行的,如果你想在executor节点上执行一些东西,那么这段代码必须是可序列化的,而SparkContext(sc,你在dropNrdds方法中引用它)是不可序列化的,因此你得到了那个错误。
然后来问你的实际问题。
不确定你的要求,但是
您可以为您的RDD创建一个DataFrame,并选择符合您的条件的记录。忽略剩下的部分。
或
您可以使用filter并创建一个带有filters data的全新
。
https://stackoverflow.com/questions/26601721
复制