我有两个输入,第一个输入是流(比方说input1),第二个是批处理(比方说input2)。我想知道第一个输入中的键是否与第二个输入中的单行或多行匹配。进一步的转换/逻辑取决于匹配的行数,是单行匹配还是多行匹配(对于第一个输入中的至少一个键)
if(single row matches){
// do something
}else{
// do something
}到目前为止我尝试过的代码
val input1Pair = streamData.map(x => (x._1, x))
val input2Pair = input2.map(x => (x._1, x))
val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
val result = joinData.mapValues{
case(v, Some(a)) => 1L
case(v, None) => 0
}.reduceByKey(_ + _).filter(_._2 > 1)我已经完成了上面的编码。当我执行result.print时,如果所有键只与input2中的一行匹配,则不打印任何内容。由于DStream可能具有多个DStream,因此不确定如何判断RDD是否为空。如果这是可能的,那么我可以做一个If检查。
发布于 2016-06-21 05:16:33
由于DStream表示一段时间内的集合,因此没有函数来确定DStream是否为空。从概念的角度来看,空的DStream将是一个永远不会有数据的流,这将不是很有用。
可以做的是检查给定的微批是否有数据:
dstream.foreachRDD{ rdd => if (rdd.isEmpty) {...} }请注意,在任何给定的时间点,只有一个RDD。
我认为实际的问题是如何检查引用RDD和DStream中的数据之间的匹配数量。可能最简单的方法是将两个集合相交并检查交集大小:
val intersectionDStream = streamData.transform{rdd => rdd.intersection(input2)}
intersectionDStream.foreachRDD{rdd =>
if (rdd.count > 1) {
..do stuff with the matches
} else {
..do otherwise
}
}我们还可以将以RDD为中心的转换放在foreachRDD操作中:
streamData.foreachRDD{rdd =>
val matches = rdd.intersection(input2)
if (matches.count > 1) {
..do stuff with the matches
} else {
..do otherwise
}
}https://stackoverflow.com/questions/37921872
复制相似问题