我有两个流,我希望能够基于每x
秒运行一次的计算来消耗其中一个流。
我认为我基本上需要创建第三个tick
流-类似于every(3.seconds)
-来执行计算,然后在其他两个流之间进行切换。
我有点被困在这里了(我才刚刚开始使用scalaz-stream)。
谢谢!
发布于 2016-05-18 18:37:50
有几种方法可以解决这个问题。解决这个问题的一种方法是使用awakeEvery
。具体示例请参见here。
为了简单地描述这个例子,假设我们想要每隔5秒查询一次twitter,并获得tweet并执行情感分析。我们可以按如下方式编写此管道:
val source =
awakeEvery(5 seconds) |> buildTwitterQuery(query) through queryChannel flatMap {
Process emitAll _
}
请注意,可以按如下方式声明queryChannel
。
def statusTask(query: Query): Task[List[Status]] = Task {
twitterClient.search(query).getTweets.toList
}
val queryChannel: Channel[Task, Query, List[Status]] = channel lift statusTask
如果你有任何问题,请告诉我。如前所述,有关完整示例,请参阅this。
我希望它能有所帮助!
https://stackoverflow.com/questions/35946734
复制相似问题