我想组合两个scalaz流,并使用一个谓词从两个流中选择下一个元素。例如,我希望这个测试能够通过:
val a = Process(1, 2, 5, 8)
val b = Process(3, 4, 5, 7)
choose(a, b)(_ < _).toList shouldEqual List(1, 2, 3, 4, 5, 5, 7, 8)我尝试了一种我认为可行的解决方案。它被编译了!但如果它什么也做不了就该死了。JVM挂起:(
import scalaz.stream.Process._
import scalaz.stream._
object StreamStuff {
def choose[F[_], I](a:Process[F, I], b:Process[F, I])(p: (I, I) => Boolean): Process[F, I] =
(a.awaitOption zip b.awaitOption).flatMap {
case (Some(ai), Some(bi)) =>
if(p(ai, bi)) emit(ai) ++ choose(a, emit(bi) ++ b)(p)
else emit(bi) ++ choose(emit(ai) ++ a, b)(p)
case (None, Some(bi)) => emit(bi) ++ b
case (Some(ai), None) => emit(ai) ++ a
case _ => halt
}
}请注意,以上是我的第二次尝试。在我的第一次尝试中,我试图创建一个Tee,但是我不知道如何取消使用失败者元素。我觉得我需要像这里这样递归的东西。
我使用的是streams版本的0.7.3a。
非常感谢任何技巧(包括增量提示,因为我想简单地学习如何自己解决这些问题)!
发布于 2016-07-14 01:29:47
我将在下面给出几个提示和一个实现,所以如果你想自己找出一个解决方案,你可能会想要覆盖屏幕。
免责声明:这只是我想到的第一种方法,我对scalaz-stream API的熟悉有点生疏,所以可能有更好的方法来实现这个操作,这个方法可能在某些可怕的方面是完全错误的,等等。
提示1
您可以在下一个递归调用中传递它们,而不是试图“取消消耗”失败的元素。
提示2
通过指出哪一方最后输掉了,可以避免累积多个输掉的元素。
提示3
当我使用Scalaz流时,我经常发现首先使用普通集合来勾勒出一个实现会更容易一些。下面是列表所需的helper方法:
/**
* @param p if true, the first of the pair wins
*/
def mergeListsWithHeld[A](p: (A, A) => Boolean)(held: Either[A, A])(
ls: List[A],
rs: List[A]
): List[A] = held match {
// Right is the current winner.
case Left(l) => rs match {
// ...but it's empty.
case Nil => l :: ls
// ...and it's still winning.
case r :: rt if p(r, l) => r :: mergeListsWithHeld(p)(held)(ls, rt)
// ...upset!
case r :: rt => l :: mergeListsWithHeld(p)(Right(r))(ls, rt)
}
// Left is the current winner.
case Right(r) => ls match {
case Nil => r :: rs
case l :: lt if p(l, r) => l :: mergeListsWithHeld(p)(held)(lt, rs)
case l :: lt => r :: mergeListsWithHeld(p)(Left(l))(lt, rs)
}
}这假设我们已经有了一个失败的元素,但现在我们可以编写我们实际想要使用的方法:
def mergeListsWith[A](p: (A, A) => Boolean)(ls: List[A], rs: List[A]): List[A] =
ls match {
case Nil => rs
case l :: lt => rs match {
case Nil => ls
case r :: rt if p(l, r) => l :: mergeListsWithHeld(p)(Right(r))(lt, rt)
case r :: rt => r :: mergeListsWithHeld(p)(Left(l))(lt, rt)
}
}然后:
scala> org.scalacheck.Prop.forAll { (ls: List[Int], rs: List[Int]) =>
| mergeListsWith[Int](_ < _)(ls.sorted, rs.sorted) == (ls ++ rs).sorted
| }.check
+ OK, passed 100 tests.好的,看起来很好。我们可以有更好的方法来编写列表,但是这个实现与我们需要为Process做的事情相匹配。
实现
下面是scalaz-stream的大致等价物:
import scalaz.{ -\/, \/, \/- }
import scalaz.stream.Process.{ awaitL, awaitR, emit }
import scalaz.stream.{ Process, Tee, tee }
def mergeWithHeld[A](p: (A, A) => Boolean)(held: A \/ A): Tee[A, A, A] =
held.fold(_ => awaitR[A], _ => awaitL[A]).awaitOption.flatMap {
case None =>
emit(held.merge) ++ held.fold(_ => tee.passL, _ => tee.passR)
case Some(next) if p(next, held.merge) =>
emit(next) ++ mergeWithHeld(p)(held)
case Some(next) =>
emit(held.merge) ++ mergeWithHeld(p)(
held.fold(_ => \/-(next), _ => -\/(next))
)
}
def mergeWith[A](p: (A, A) => Boolean): Tee[A, A, A] =
awaitL[A].awaitOption.flatMap {
case None => tee.passR
case Some(l) => awaitR[A].awaitOption.flatMap {
case None => emit(l) ++ tee.passL
case Some(r) if p(l, r) => emit(l) ++ mergeWithHeld(p)(\/-(r))
case Some(r) => emit(r) ++ mergeWithHeld(p)(-\/(l))
}
}让我们再检查一遍:
scala> org.scalacheck.Prop.forAll { (ls: List[Int], rs: List[Int]) =>
| Process.emitAll(ls.sorted).tee(Process.emitAll(rs.sorted))(
| mergeWith(_ < _)
| ).toList == (ls ++ rs).sorted
| }.check
+ OK, passed 100 tests.我不会在没有更多测试的情况下将其投入生产,但看起来它是有效的。
发布于 2017-04-30 13:09:28
正如Travis Brown建议的那样,您必须实现一个自定义的tee。这是我的发球台implementation:
/*
A tee which sequentially compares elements from left and right
and passes an element from left if predicate returns true, otherwise
passes an element from right.
*/
def predicateTee[A](predicate: (A, A) => Boolean): Tee[A, A, A] = {
def go(stack: Option[A \/ A]): Tee[A, A, A] = {
def stackEither(l: A, r: A) =
if (predicate(l, r)) emit(l) ++ go(\/-(r).some) else emit(r) ++ go(-\/(l).some)
stack match {
case None =>
awaitL[A].awaitOption.flatMap { lo =>
awaitR[A].awaitOption.flatMap { ro =>
(lo, ro) match {
case (Some(l), Some(r)) => stackEither(l, r)
case (Some(l), None) => emit(l) ++ passL
case (None, Some(r)) => emit(r) ++ passR
case _ => halt
}
}
}
case Some(-\/(l)) => awaitR[A].awaitOption.flatMap {
case Some(r) => stackEither(l, r)
case None => emit(l) ++ passL
}
case Some(\/-(r)) => awaitL[A].awaitOption.flatMap {
case Some(l) => stackEither(l, r)
case None => emit(r) ++ passR
}
}
}
go(None)
}
val p1: Process[Task, Int] = Process(1, 2, 4, 5, 9, 10, 11)
val p2: Process[Task, Int] = Process(0, 3, 7, 8, 6)
p1.tee(p2)(predicateTee(_ < _)).runLog.run
//res0: IndexedSeq[Int] = Vector(0, 1, 2, 3, 4, 5, 7, 8, 6, 9, 10, 11)https://stackoverflow.com/questions/38353244
复制相似问题