首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >火花流缓存和转换

火花流缓存和转换
EN

Stack Overflow用户
提问于 2014-10-20 06:21:33
回答 1查看 917关注 0票数 3

我是新来的火花,我使用星火流与卡夫卡..。

我的流持续时间是1秒。

假设我在第一批中得到100条记录,在第二批中得到120条记录,在第三批中得到80条记录。

代码语言:javascript
运行
复制
--> {sec 1   1,2,...100} --> {sec 2 1,2..120} --> {sec 3 1,2,..80}

我在第一批中应用了我的逻辑,并得到了一个结果=> result1

我想在处理第二批时使用result1,并将第二批的result1和120个记录合并为=> result2。

我试图缓存结果,但我无法在2s中获得缓存的result1,有可能吗?或者在这里展示一下如何实现我的目标?

代码语言:javascript
运行
复制
 JavaPairReceiverInputDStream<String, String> messages =   KafkaUtils.createStream(jssc, String.class,String.class, StringDecoder.class,StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2());

我处理消息并查找单词,这是1秒的结果。

代码语言:javascript
运行
复制
if(resultCp!=null){
                resultCp.print();
                result = resultCp.union(words.mapValues(new Sum()));

            }else{
                result = words.mapValues(new Sum());
            }

 resultCp =  result.cache();

当在第二批中,resultCp不应该是null,但是它返回null值,所以在任何给定的时间,我只有特定的秒数据,我想要找到累积的结果。有人知道怎么做吗。

我了解到,一旦火花流启动,jssc.start(),控制不再在我们的末端,它是由火花。那么,是否可以将第一批的结果发送到第二批,以求累积价值?

任何帮助都是非常感谢的。提前谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2014-10-20 10:00:20

我认为您正在寻找updateStateByKey,它通过对提供的DStream和某些状态应用累积函数来创建一个新的DStream。这个来自星火样例包的例子涵盖了问题中的情况:

首先,您需要一个接受新值和以前已知值的更新函数:

代码语言:javascript
运行
复制
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  val currentCount = values.sum

  val previousCount = state.getOrElse(0)

  Some(currentCount + previousCount)
}

该函数用于创建一个从源dstream累积值的Dstream。如下所示:

代码语言:javascript
运行
复制
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using updateStateByKey
// This will give a Dstream made of state (which is the cumulative count of the words)
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) 

来源:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/26459550

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档