有没有可能在不触发"Substream不能物化一次以上“的情况下缓存并重用源?
我正在做一个流连接,它需要为左边的每个元素调用一个微服务。该调用返回要加入的记录流。我希望缓存源,以便对微服务的相同调用在缓存流中产生结果。但是我正在做的flatMapConcat抛出了"Substream不能被物化不止一次“错误。代码如下所示:
val cache = new util.HashMap[AnyRef, Source[Array[AnyRef], Any]]()
inputSource
.flatMapConcat { record =>
val key = leftKey(record)
val rightElemSource = if (cache.containsKey(key)) {
cache.get(key)
} else {
val rightElemSourceInner = doSomethingToGetSource()
cache.put(key, rightElemSourceInner)
rightElemSourceInner
}
rightElemSource.map(join(record, _))
}
发布于 2019-03-28 03:57:44
Source
表示一个潜在的巨大甚至无限的数据流。它被设计成像Iterator
一样只被遍历一次。如果你真的想要重用源代码的内容,你必须将它收集到一个常规的数据结构中,比如Seq
。所以你的缓存应该是util.HashMap[AnyRef, Seq[Array[AnyRef], Any]]
类型的。
https://stackoverflow.com/questions/55378093
复制相似问题