FunDA(7)- Reactive Streams to fs2 Pull Streams

    Reactive-Stream不只是简单的push-model-stream, 它还带有“拖式”(pull-model)性质。这是因为在Iteratee模式里虽然理论上由Enumerator负责主动推送数据,实现了push-model功能。但实际上Iteratee也会根据自身情况,通过提供callback函数通知Enumerator可以开始推送数据,这从某种程度上也算是一种pull-model。换句话讲Reactive-Streams是通过push-pull-model来实现上下游Enumerator和Iteratee之间互动的。我们先看个简单的Iteratee例子:

def showElements: Iteratee[Int,Unit] = Cont {
  case Input.El(e) =>
     println(s"EL($e)")
     showElements
  case Input.Empty => showElements
  case Input.EOF =>
     println("EOF")
     Done((),Input.EOF)
}                                                 //> showElements: => play.api.libs.iteratee.Iteratee[Int,Unit]
val enumNumbers = Enumerator(1,2,3,4,5)           //> enumNumbers  : play.api.libs.iteratee.Enumerator[Int] = play.api.libs.iteratee.Enumerator$$anon$19@47f6473

enumNumbers |>> showElements                      //> EL(1)
                                                  //| EL(2)
                                                  //| EL(3)
                                                  //| EL(4)
                                                  //| EL(5)
                                                  //| res0: scala.concurrent.Future[play.api.libs.iteratee.Iteratee[Int,Unit]] = Success(Cont(<function1>))

我们看到:enumNumbers |>> showElements立刻启动了运算。但并没有实际完成数据发送,因为showElements并没有收到Input.EOF。首先,我们必须用Iteratee.run来完成运算:

val it = Iteratee.flatten(enum |>> consumeAll).run//> El(1)
                                                  //| El(2)
                                                  //| El(3)
                                                  //| El(4)
                                                  //| El(5)
                                                  //| El(6)
                                                  //| El(7)
                                                  //| El(8)
                                                  //| EOF
                                                  //| it  : scala.concurrent.Future[Int] = Success(99)

这个run函数是这样定义的:

/**
   * Extracts the computed result of the Iteratee pushing an Input.EOF if necessary
   * Extracts the computed result of the Iteratee, pushing an Input.EOF first
   * if the Iteratee is in the [[play.api.libs.iteratee.Cont]] state.
   * In case of error, an exception may be thrown synchronously or may
   * be used to complete the returned Promise; this indeterminate behavior
   * is inherited from fold().
   *
   *  @return a [[scala.concurrent.Future]] of the eventually computed result
   */
  def run: Future[A] = fold({
    case Step.Done(a, _) => Future.successful(a)
    case Step.Cont(k) => k(Input.EOF).fold({
      case Step.Done(a1, _) => Future.successful(a1)
      case Step.Cont(_) => sys.error("diverging iteratee after Input.EOF")
      case Step.Error(msg, e) => sys.error(msg)
    })(dec)
    case Step.Error(msg, e) => sys.error(msg)
  })(dec)

再一个问题是:enumNumbers |>> showElements是个封闭的运算,我们无法逐部分截取数据流,只能取得整个运算结果。也就是说如果我们希望把一个Enumerator产生的数据引导到fs2 Stream的话,只能在所有数据都读入内存后才能实现了。这样就违背了使用Reactive-Streams的意愿。那我们应该怎么办?一个可行的方法是使用一个存储数据结构,用两个线程,一个线程里Iteratee把当前数据存入数据结构,另一个线程里fs2把数据取出来。fs2.async.mutable包提供了个Queue类型,我们可以用这个Queue结构来作为Iteratee与fs2之间的管道:Iteratee从一头把数据压进去(enqueue),fs2从另一头把数据取出来(dequeue)。

我们先设计enqueue部分,这部分是在Iteratee里进行的:

def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {
   case Input.EOF =>
       q.enqueue1(None).unsafeRun
       Done((),Input.EOF)
   case Input.Empty => enqueueTofs2(q)
   case Input.El(e) =>
       q.enqueue1(Some(e)).unsafeRun
       enqueueTofs2(q)
}    //> enqueueTofs2: (q: fs2.async.mutable.Queue[fs2.Task,Option[Int]])play.api.libs.iteratee.Iteratee[Int,Unit]

先分析一下这个Iteratee:我们直接把enqueueTofs2放入Cont状态,也就是等待接受数据状态。当收到数据时运行q.enqueue1把数据塞入q,然后不断循环运行至收到Input.EOF。注意:q.enqueue1(Some(e)).unsafeRun是个同步运算,在未成功完成数据enqueue1的情况下会一直占用线程。所以,q另一端的dequeue部分必须是在另一个线程里运行,否则会造成整个程序的死锁。fs2的Queue类型款式是:Queue[F,A],所以我们必须用Stream.eval来对这个Queue进行函数式的操作:

val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>
    //run Enumerator-Iteratee and enqueue data in thread 1
    //dequeue data and en-stream in thread 2(current thread)
  }

因为Stream.eval运算结果是Stream[Task,Int],所以我们可以得出这个flatMap内的函数款式 Queue[Task,Option[Int]] => Stream[Task,Int]。下面我们先考虑如何实现数据enqueue部分:这部分是通过Iteratee的运算过程产生的。我们提到过这部分必须在另一个线程里运行,所以可以用Task来选定另一线程如下:

    Task { Iteratee.flatten(enumerator |>> pushData(q)).run }.unsafeRunAsyncFuture()

现在这个Task就在后面另一个线程里自己去运算了。但它的运行进展则会依赖于另一个线程中dequeue数据的进展。我们先看看fs2提供的两个函数款式:

/** Repeatedly calls `dequeue1` forever. */
  def dequeue: Stream[F, A] = Stream.bracket(cancellableDequeue1)(d => Stream.eval(d._1), d => d._2).repeat

/**
   * Halts the input stream at the first `None`.
   *
   * @example {{{
   * scala> Stream[Pure, Option[Int]](Some(1), Some(2), None, Some(3), None).unNoneTerminate.toList
   * res0: List[Int] = List(1, 2)
   * }}}
   */
  def unNoneTerminate[F[_],I]: Pipe[F,Option[I],I] =
    _ repeatPull { _.receive {
      case (hd, tl) =>
        val out = Chunk.indexedSeq(hd.toVector.takeWhile { _.isDefined }.collect { case Some(i) => i })
        if (out.size == hd.size) Pull.output(out) as tl
        else if (out.isEmpty) Pull.done
        else Pull.output(out) >> Pull.done
    }}

刚好,dequeue产生Stream[F,A]。而unNoneTerminate可以根据Stream(None)来终止运算。现在我们可以把这个Reactive-Streams到fs2-pull-streams转换过程这样来定义:

implicit val strat = Strategy.fromFixedDaemonPool(4)
                                                  //> strat  : fs2.Strategy = Strategy
val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>
  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture
  pipe.unNoneTerminate(q.dequeue)
}   //> fs2Stream  : fs2.Stream[fs2.Task,Int] = attemptEval(Task).flatMap(<function1>).flatMap(<function1>)

现在这个stream应该已经变成fs2.Stream[Task,Int]了。我们可以用前面的log函数来试运行一下:

def log[A](prompt: String): Pipe[Task,A,A] =
    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}
                                                  //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]
    
fs2Stream.through(log("")).run.unsafeRun          //> > 1
                                                  //| > 2
                                                  //| > 3
                                                  //| > 4
                                                  //| > 5

我们成功的把Iteratee的Reactive-Stream转化成fs2的Pull-Model-Stream。

下面是这次讨论的源代码:

import play.api.libs.iteratee._
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.collection.mutable._
import fs2._
object iteratees {
def showElements: Iteratee[Int,Unit] = Cont {
  case Input.El(e) =>
     println(s"EL($e)")
     showElements
  case Input.Empty => showElements
  case Input.EOF =>
     println("EOF")
     Done((),Input.EOF)
}
val enumNumbers = Enumerator(1,2,3,4,5)

enumNumbers |>> showElements

Iteratee.flatten(enumNumbers |>> showElements).run


def enqueueTofs2(q: async.mutable.Queue[Task,Option[Int]]): Iteratee[Int,Unit] = Cont {
   case Input.EOF =>
       q.enqueue1(None).unsafeRun
       Done((),Input.EOF)
   case Input.Empty => enqueueTofs2(q)
   case Input.El(e) =>
       q.enqueue1(Some(e)).unsafeRun
       enqueueTofs2(q)
}
implicit val strat = Strategy.fromFixedDaemonPool(4)
val fs2Stream: Stream[Task,Int] = Stream.eval(async.boundedQueue[Task,Option[Int]](2)).flatMap { q =>
  Task(Iteratee.flatten(enumNumbers |>> enqueueTofs2(q)).run).unsafeRunAsyncFuture
  pipe.unNoneTerminate(q.dequeue)
}

def log[A](prompt: String): Pipe[Task,A,A] =
    _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}
    
fs2Stream.through(log("")).run.unsafeRun
 
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏编程

记住这35个大神级别的Python操作,足够精简上千行代码!

从我开始学习python的时候,我就开始自己总结一个python小技巧的集合。后来当我什么时候在Stack Overflow或者在某个开源软件里看到一段很酷代码...

2897
来自专栏.NET技术

C#系列之String和StringBuilder

      首先和博园的各位打声招呼,小弟在博园呆了也有一年多了。平常一有时间就会过来看看文章,学习各位的经验,现在养成了一种一天不来博园,心里就不踏实的习惯,...

984
来自专栏数据结构与算法

3339: Rmq Problem

Description image.png Input image.png Output image.png Sample Input 7 5 ...

33211
来自专栏菩提树下的杨过

scala 学习笔记(06) OOP(下)多重继承 及 AOP

一、多继承 上篇trait中,已经看到了其用法十分灵活,可以借此实现类似"多重继承"的效果,语法格式为: class/trait A extends B wit...

19410
来自专栏ACM算法日常

整除的尾数(整除问题) - HDU 2099

最近在写POJ 2449,暂时还没写完,目前上卷的题目基本都涉及到了,后面会接着加强,特别是动态规划方面的题目,因为比较灵活也比较难。

671
来自专栏算法与数据结构

数据结构 栈&队列

2-4 依次在初始为空的队列中插入元素a,b,c,d以后,紧接着做了两次删除操作,此时的队头元素是( ) 删除,移动头指针; 增加,移动尾指针; 删除a,b ,...

36410
来自专栏代码世界

题型分析

1、 简单让A、B的值交换 a,b = 1,2 print(a,b) a = 1 b = 2 a,b = b,a print(a,b) a,b=[1,2],[5...

34010
来自专栏Python自动化测试

python接口测试之序列化与反序列化(四)

在python中,序列化可以理解为:把python的对象编码转换为json格式的字符串,反序列化可以理解为:把json格式字符串解码为python...

1094
来自专栏一个会写诗的程序员的博客

《Kotin 编程思想·实战》

Xtend是Eclipse推出的一个新的JVM语言,并无意替代Java,而是以己之长补Java之短,精简代码,无类型,改进可读和维护。Eclipse Xtend...

1073
来自专栏林德熙的博客

win10 uwp xaml 绑定接口

早上快乐 就在你的心问了我一个问题,他使用的属性是显式继承,但是无法在xaml绑定

582

扫码关注云+社区