Scalaz(46)- scalaz-stream 基础介绍

    scalaz-stream是一个泛函数据流配件库(functional stream combinator library),特别适用于函数式编程。scalar-stream是由一个以上各种状态的Process串联组成。stream代表一连串的元素,可能是自动产生或者由外部的源头输入,如:一连串鼠标位置;文件中的文字行;数据库记录;又或者一连串的HTTP请求等。Process就是stream转换器(transducer),它可以把一种stream转换成另一种stream。Process的类型款式如下:

sealed trait Process[+F[_], +O]

其中F是个高阶类,是一种算法,O是Process的运算值。从类型款式上看Process是个对O类型值进行F运算的节点,那么scalaz-stream就应该是个运算流了。Process包含以下几种状态:

case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O]

case class Await[+F[_], A, +O](
    req: F[A]
    , rcv: (EarlyCause \/ A) => Trampoline[Process[F, O]] @uncheckedVariance
    , preempt : A => Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) => Trampoline.delay(halt:Process[F,Nothing])
    ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] {
...
}
case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing]

case class Append[+F[_], +O](
    head: HaltEmitOrAwait[F, O]
    , stack: Vector[Cause => Trampoline[Process[F, O]]] @uncheckedVariance
    ) extends Process[F, O] {
...
}   

scalaz-stream是个主动读取模式的流(pull model stream),Process转换stream的方式不是以Stream[I] => Stream[O]这种函数方式,而是一种状态转换方式进行(state transition),所以这些状态就等于向一个驱动程序发出的请求:

Emit[+O]:请求发一个O值

Await[+F[_],A,+O]:要求运算F[A],得出F[A]的结果A后输入函数rcv再运算得出下一个Process状态。这个是flatMap函数的结构化版本

Halt:停止发送

Append:连接前后两个Process

可以看到Emit,Await,Halt,Append都是Process类型的结构化状态。其中Await就是flatMap函数的结构化,Emit就像Return,所以Process就是一个Free Monad。

Emit的作用是发出一个O值,Await的作用是运算F然后连接下一个Process, Append的作用则是把前一个Process的信息传递到下一个Process。Await和Append分别是不同方式的Process连接方式。

Process又分以下几类:

  type Process0[+O] = Process[Nothing,O]

  /**
   * A single input stream transducer. Accepts input of type `I`,
   * and emits values of type `O`.
   */
  type Process1[-I,+O] = Process[Env[I,Any]#Is, O]

  /**
   * A stream transducer that can read from one of two inputs,
   * the 'left' (of type `I`) or the 'right' (of type `I2`).
   * `Process1[I,O] <: Tee[I,I2,O]`.
   */
  type Tee[-I,-I2,+O] = Process[Env[I,I2]#T, O]

  /**
   * A stream transducer that can read from one of two inputs,
   * non-deterministically.
   */
  type Wye[-I,-I2,+O] = Process[Env[I,I2]#Y, O]

  /**
   * An effectful sink, to which we can send values. Modeled
   * as a source of effectful functions.
   */
  type Sink[+F[_],-O] = Process[F, O => F[Unit]]

  /**
   * An effectful channel, to which we can send values and
   * get back responses. Modeled as a source of effectful
   * functions.
   */
  type Channel[+F[_],-I,O] = Process[F, I => F[O]]

Process[F[_],O]:source:运算流源点,由此发送F[O]运算

Process0[+O]:>>>Process[Nothing,+O]:source:纯数据流源点,发送O类型元素

Process1[-I,+O]:一对一的数据转换节点:接收一个I类型输入,经过处理转换成O类型数据输出

Tee[-I1,-I2,+O]:二对一的有序输入数据转换节点:从左右两边一左一右有顺接受I1,I2类型输入后转换成O类型数据输出

Wye[-I1,-I2,+O]:二对一的无序输入数据转换节点:不按左右顺序,按上游数据发送情况接受I1,I2类型输入后转换成O类型数据输出

Sink[+F[_],-O]:运算终点,在此对O类型数据进行F运算,不返回值:O => F[Unit]

Channel[+F[_],-I,O]:运算终点,接受I类型输入,进行F运算后返回F[O]:I => F[O]

以下是一些简单的Process构建方法:

1  Process.emit(1)                                  //> res0: scalaz.stream.Process0[Int] = Emit(Vector(1))
2  Process.emitAll(Seq(1,2,3))                      //> res1: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3))
3  Process.halt                                     //> res2: scalaz.stream.Process0[Nothing] = Halt(End)
4  Process.range(1,2,3)           //> res3: scalaz.stream.Process0[Int] = Append(Halt(End),Vector(<function1>))

这些是纯数据流的构建方法。scalaz-stream通常把Task作为F运算,下面是Task运算流的构建或者转换方法:

1 val p: Process[Task,Int] = Process.emitAll(Seq(1,2,3))  //> p  : scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End),Vector(<function1>))
2  Process.range(1,2,3).toSource                   //> res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Append(Halt(End),Vector(<function1>))
3  //把F[A]升格成Process[F,A]
4 Process.eval(Task.delay {5 * 8})                 //> res5: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await(scalaz.concurrent.Task@56aac163,<function1>,<function1>)

对stream的Process进行运算有下面几种run方法:

/**
   * Collect the outputs of this `Process[F,O]` into a Monoid `B`, given a `Monad[F]` in
   * which we can catch exceptions. This function is not tail recursive and
   * relies on the `Monad[F]` to ensure stack safety.
   */
final def runFoldMap[F2[x] >: F[x], B](f: O => B)(implicit F: Monad[F2], C: Catchable[F2], B: Monoid[B]): F2[B] = {
...}
/**
   * Collect the outputs of this `Process[F,O]`, given a `Monad[F]` in
   * which we can catch exceptions. This function is not tail recursive and
   * relies on the `Monad[F]` to ensure stack safety.
   */
final def runLog[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Vector[O2]] = {
...}
/** Run this `Process` solely for its final emitted value, if one exists. */
final def runLast[F2[x] >: F[x], O2 >: O](implicit F: Monad[F2], C: Catchable[F2]): F2[Option[O2]] = {
...}
/** Run this `Process` solely for its final emitted value, if one exists, using `o2` otherwise. */
final def runLastOr[F2[x] >: F[x], O2 >: O](o2: => O2)(implicit F: Monad[F2], C: Catchable[F2]): F2[O2] =
    runLast[F2, O2] map { _ getOrElse o2 }
/** Run this `Process`, purely for its effects. */
final def run[F2[x] >: F[x]](implicit F: Monad[F2], C: Catchable[F2]): F2[Unit] =
    F.void(drain.runLog(F, C))

这几个函数都返回F2运算,如果F2是Task的话那么我们就可以用Task.run来获取结果值: 

 1  //runFoldMap就好比Monoid的sum
 2  p.runFoldMap(identity).run                       //> res6: Int = 6
 3  p.runFoldMap(i => i * 2).run                     //> res7: Int = 12
 4  p.runFoldMap(_.toString).run                     //> res8: String = 123
 5  //runLog把收到的元素放入vector中
 6  p.runLog.run                                     //> res9: Vector[Int] = Vector(1, 2, 3)
 7  //runLast取最后一个元素,返回Option
 8  p.runLast.run                                    //> res10: Option[Int] = Some(3)
 9  Process.halt.toSource.runLast.run                //> res11: Option[Nothing] = None
10  Process.halt.toSource.runLastOr(65).run          //> res12: Int = 65
11  //run只进行F的运算,放弃所有元素
12  p.run      //> res13: scalaz.concurrent.Task[Unit] = scalaz.concurrent.Task@26b3fd41
13  p.run.run  //Task[Unit] 返回Unit
14  Process.emit(print("haha")).toSource.run.run     //> haha

与List和Stream操作相似,我们同样可以对scalar-stream Process施用同样的操作函数,也就是一些stream转换函数:

1  p.take(2).runLog.run                             //> res14: Vector[Int] = Vector(1, 2)
2  p.filter {_ > 2}.runLog.run                      //> res15: Vector[Int] = Vector(3)
3  p.last.runLog.run                                //> res16: Vector[Int] = Vector(3)
4  p.drop(1).runLog.run                             //> res17: Vector[Int] = Vector(2, 3)
5  p.exists{_ > 5}.runLog.run                       //> res18: Vector[Boolean] = Vector(false)

以上这些函数与scala标准库的stream很相似。再看看map,flatMap吧:

1  p.map{i => s"Int:$i"}.runLog.run                 //> res19: Vector[String] = Vector(Int:1, Int:2, Int:3)
2  p.flatMap{i => Process(i,i-1)}.runLog.run        //> res20: Vector[Int] = Vector(1, 0, 2, 1, 3, 2)

仔细检查可以看出来上面的这些转换操作都是针对Process1类型的,都是元素在流通过程中得到转换。我们会在下篇讨论中介绍一些更复杂的Process操作,如:Sink,Tee,Wyn...,然后是scalaz-stream的具体应用

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏java思维导图

spring框架思维导图,简约概括

Spring之旅 简化java开发 激发pojo的潜能 依赖注入 应用切面 使用模版消除样板式代码 容纳你的bean 与应用上下文共事 bean的生命周期 俯翰...

2726
来自专栏H2Cloud

C++ 多进程并发框架FFLIB之Tutorial

      FFLIB框架是为简化分布式/多进程并发而生的。它起始于本人尝试解决工作中经常遇到的问题如消息定义、异步、多线程、单元测试、性能优化等。基本介绍可以...

4396
来自专栏java相关

Memcached相关内容总结

Memcached使用Slab Allocator机制分配和管理内存,这种分配机制可以减少内存碎片的产生,减轻系统管理内存的负担。

813
来自专栏达观数据

达观数据应对大规模消息数据的处理经验

达观数据是为企业提供大数据处理、个性化推荐系统服务的知名公司,在应对海量数据处理时,积累了大量实战经验。其中达观数据在面对大量的数据交互和消息处理时,使用了称为...

3568
来自专栏Java开发者杂谈

分布式改造剧集2---DIY分布式锁

1447
来自专栏技术博文

Memcached 及 Redis 架构分析和比较

Memcached和Redis作为两种Inmemory的key-value数据库,在设计和思想方面有着很多共通的地方,功能和应用方面在很多场合下(作为分布式缓存...

2723
来自专栏风中追风

分布式环境下的解决方案——分布式锁

锁是一个抽象的概念,锁的实现,需要依存于一个可以存储锁的空间。在多线程中是内存,在多进程中是内存或者磁盘。更重要的是,这个空间是可以被访问到的。多线程中,不同的...

3198
来自专栏逸鹏说道

开发人员为何需要企业服务总线?

引言 重要的应用程序很少是单独存在的;如果不能与其他的应用程序一起使用,应用程序将难以发挥很大的作用。面向服务的体系结构往往将应用程序集成在一起,这样它们就可以...

2435
来自专栏Java架构解析

微服务网关Zuul迁移到Spring Cloud Gateway

本文将会介绍将微服务网关由Zuul迁移到Spring Cloud Gateway。

1690
来自专栏个人分享

实时交互平台流程与技术分析

  最近几个月一直在做基于storm的流式处理,索性整理下所有的知识点与技术知识。

531

扫码关注云+社区