首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何判断DStream是否为空

如何判断DStream是否为空
EN

Stack Overflow用户
提问于 2016-06-20 19:52:45
回答 1查看 3.8K关注 0票数 2

我有两个输入,第一个输入是流(比方说input1),第二个是批处理(比方说input2)。我想知道第一个输入中的键是否与第二个输入中的单行或多行匹配。进一步的转换/逻辑取决于匹配的行数,是单行匹配还是多行匹配(对于第一个输入中的至少一个键)

代码语言:javascript
运行
复制
if(single row matches){
     // do something
}else{
     // do something
}

到目前为止我尝试过的代码

代码语言:javascript
运行
复制
val input1Pair = streamData.map(x => (x._1, x))
val input2Pair = input2.map(x => (x._1, x))
val joinData = input1Pair.transform{ x => input2Pair.leftOuterJoin(x)}
val result = joinData.mapValues{
    case(v, Some(a)) => 1L
    case(v, None) => 0
 }.reduceByKey(_ + _).filter(_._2 > 1)

我已经完成了上面的编码。当我执行result.print时,如果所有键只与input2中的一行匹配,则不打印任何内容。由于DStream可能具有多个DStream,因此不确定如何判断RDD是否为空。如果这是可能的,那么我可以做一个If检查。

EN

回答 1

Stack Overflow用户

发布于 2016-06-21 05:16:33

由于DStream表示一段时间内的集合,因此没有函数来确定DStream是否为空。从概念的角度来看,空的DStream将是一个永远不会有数据的流,这将不是很有用。

可以做的是检查给定的微批是否有数据:

代码语言:javascript
运行
复制
dstream.foreachRDD{ rdd => if (rdd.isEmpty) {...} }

请注意,在任何给定的时间点,只有一个RDD。

我认为实际的问题是如何检查引用RDD和DStream中的数据之间的匹配数量。可能最简单的方法是将两个集合相交并检查交集大小:

代码语言:javascript
运行
复制
val intersectionDStream = streamData.transform{rdd => rdd.intersection(input2)}
intersectionDStream.foreachRDD{rdd => 
    if (rdd.count > 1) {
       ..do stuff with the matches
    } else {
       ..do otherwise
    }
}

我们还可以将以RDD为中心的转换放在foreachRDD操作中:

代码语言:javascript
运行
复制
streamData.foreachRDD{rdd => 
    val matches = rdd.intersection(input2)
    if (matches.count > 1) {
       ..do stuff with the matches
    } else {
       ..do otherwise
    }
}
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37921872

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档