Scalaz(25)- Monad: Monad Transformer-叠加Monad效果

中间插播了几篇scalaz数据类型,现在又要回到Monad专题。因为FP的特征就是Monad式编程(Monadic programming),所以必须充分理解认识Monad、熟练掌握Monad运用。曾经看到一段对Monad的描述:“Monadic for-comprehension就是一种嵌入式编程语言,由它的Monad提供它的语法”。但如果每一种Monad的for-comprehension都独立提供一套语法的话,这种编程语言就显得十分单调、功能简单了。那么既然是FP,我们应该可以通过函数组合(function composition)来把很多简单的for-comprehension组合成一套完善的编程语言吧?比如这样:Option[A] >>> IO[Option[A]] >>> IO[Either[String,Option[A]]。恰恰,Monad是不支持函数组合的。先了解一下函数组合:Functor是可以组合的,我们可以把fa和fb组合成一个更复杂的Functor fab,我们来验证一下:

 def composeFunctor[M[_],N[_]](fa: Functor[M], fb: Functor[N]): Functor[({type mn[x] = M[N[x]]})#mn] =
   new Functor[({type mn[x] = M[N[x]]})#mn] {
        def map[A,B](fab: M[N[A]])(f: A => B): M[N[B]] =
           fa.map(fab)(n => fb.map(n)(f))
   }                                              //> composeFunctor: [M[_], N[_]](fa: scalaz.Functor[M], fb: scalaz.Functor[N])s
                                                  //| calaz.Functor[[x]M[N[x]]]

我们来解释一下:如果M,N都是Functor,那么M[N[A]]也是Functor,我们可以用M[N[A]].map来运算A值。看看下面的例子:

1 val stringlen: String => Int = _.length           //> stringlen  : String => Int = <function1>
2 val optionInList = List("1".some,"12".some,"123".some)
3                                                   //> optionInList  : List[Option[String]] = List(Some(1), Some(12), Some(123))
4 
5 val mnFunctor = composeFunctor(Functor[List],Functor[Option])
6                                                   //> mnFunctor  : scalaz.Functor[[x]List[Option[x]]] = Exercises.monadtrans$$ano
7                                                   //| nfun$main$1$$anon$1@130d63be
8 mnFunctor.map(optionInList)(stringlen)            //> res3: List[Option[Int]] = List(Some(1), Some(2), Some(3))

那么我们需要的Monad组合应该是这样的:M[N[A]],M,N都是Monad,如:Either[String,Option[A]],甚至是M[N[P[A]]],三层Monad。可惜,不是所有Monad都支持函数组合的,看下面:

 def composeMonad[M[_],N[_]](ma: Monad[M], mb: Monad[N]): Monad[({type mn[x] = M[N[x]]})#mn] =
   new Monad[({type mn[x] = M[N[x]]})#mn] {
     def point[A](a: => A) = ma.point(mb.point(a))
        def bind[A,B](mab: M[N[A]])(f: A => M[N[B]]): M[N[B]] =
           ??? ...
   }

实现M[N[A]].bind是不可能的,大家可以试试。这就堵死了函数组合这条路。难道我们就无法使用M[N[A]]这样的for-comprehension了吗?毕竟像Either[String,Option[A]]这样的组合是很普遍的啊,比如说从数据库里读取这样的动作,有几种可能:取得数据、无数据None、发生错误。无论如何我们先试试用for-comprehension:

1 type Result[A] = String \/ Option[A]
2 val result: Result[Int] = 62.some.right           //> result  : Exercises.monadtxns.Result[Int] = \/-(Some(62))
3 for {
4     optionValue <- result
5 } yield {
6   for {
7       valueA <- optionValue
8   } yield valueA + 18                             //> res0: scalaz.\/[String,Option[Int]] = \/-(Some(80))
9 }

从上面可以了解我们必须用两层for-comprehension才能运算A值。那么可想而知如果是M[N[P[A]]]就需要三层for-comprehension了。这就是所谓的“下阶梯式算法”(stair-stepping)。表面上来看stair-stepping会产生复杂臃肿的代码,丧失FP的精简优雅风格。但想深一层,如果其中一个Monad是会产生副作用的如IO[Option[A]],那么上面的例子就变成这样:

1 for {
2   optionData <- IO
3 } yield {
4   for {
5     data <- optionData
6   } yield Process(data)
7 }

我们看到在第一层运算里进行了IO运算,产生了副作用。那么以上的代码就不再是纯代码了,无法保障函数组合。也就是说stair-stepping会产生不纯代码,违背了FP要求。之前我们曾经讨论过 ReaderWriterState Monad,它是Reader,Writer,State三个Monad的组合。在它的for-comprehension里的运算结果类型是ReaderWriterState一种,所以没有stair-stepping忧虑。但我们必须先创建一个新的类型(不是通过函数组合的新类型)。难道我们在使用不同要求的for-comprehension时都需要重新创建一个新类型吗,这样不就损失了FP的代码重复使用特点了吗?不,scalaz提供的Monad Transformer就是一个有效的解决方案。

scalaz为很多type class提供了Monad Transformer,它们都以T尾缀命名如OptionT、EitherT、StateT...,我们可以通过Monad Transformer来灵活地组合Monad。以OptionT为例:

1 type Error[A] = \/[String, A]
2 type Result[A] = OptionT[Error, A]
3 
4 val result: Result[Int] = 62.point[Result]        //> result  : Exercises.monadtxns.Result[Int] = OptionT(\/-(Some(62)))
5 val transformed =
6   for {
7     value <- result
8   } yield value + 18                              //> transformed  : scalaz.OptionT[Exercises.monadtxns.Error,Int] = OptionT(\/-(S
9                                                   //| ome(80)))

现在,运算A只需要一层context了。Result就是通过Monad Transformer产生的新类型。在上面的类型构建里,OptionT就是一个Monad Transformer、Error是固定了Left类型的Either。因为Either有两个类型参数,我们实际上也可以直接用type lambda来表示Result[A]:

type Result[A] = OptionT[({type l[x] = \/[String,x]})#l,A]

不过这样写不但复杂,而且会影响编译器的类型推导(compiler type inference)。

值得注意的是,Monad Transformer 类型的构建是由内向外反向的。比如上面的例子中OptionT是个Monad Transformer,它的类型款式是OptionT[M[_],A]。OptionT实际上是用来构建M[Option[A]],在我们的例子里就是Either[Option[A]]。我们来看看一些常用Monad Transformer的类型款式:

final case class OptionT[F[_], A](run: F[Option[A]]) {
...
final case class EitherT[F[_], A, B](run: F[A \/ B]) {
...
final case class ListT[F[_], A](run: F[List[A]]){
...
trait IndexedStateT[F[_], -S1, S2, A] { self =>
  /** Run and return the final value and state in the context of `F` */
  def apply(initial: S1): F[(S2, A)]

可以看到,Monad Transformer 的主要作用就在构成run这个我们称为嵌入值了。F可以是任何普通Monad。在上面的例子就变成了:

OptionT[Either,A](run: Either[Option[A]]),这个Either[Option[A]]就是我们的目标类型。而我们在操作时如在for-comprehension中运算时使用的类型则必须统一为OptionT[Either,A]。

我们如何去构建Monad Transformer类型值呢?我们可以用Applicative[MT].point或者直接用构建器方式如OptionT(...)

//point升格
Applicative[Result].point(62)                     //> res0: Exercises.monadtxns.Result[Int] = OptionT(\/-(Some(62)))
//简写版本
62.point[Result]                                  //> res1: Exercises.monadtxns.Result[Int] = OptionT(\/-(Some(62)))
//会产生错误结果
None.point[Result]                                //> res2: Exercises.monadtxns.Result[None.type] = OptionT(\/-(Some(None)))
"Oh,shit!".left.point[Result]                     //> res3: Exercises.monadtxns.Result[scalaz.\/[String,Nothing]] = OptionT(\/-(So
                                                  //| me(-\/(Oh,shit!))))
//用构建器
OptionT((None: Option[Int]).point[Error])         //> res4: scalaz.OptionT[Exercises.monadtxns.Error,Int] = OptionT(\/-(None))
OptionT(none[Int].point[Error])                   //> res5: scalaz.OptionT[Exercises.monadtxns.Error,Int] = OptionT(\/-(None))
OptionT("Oh,shit!".left: Error[Option[Int]])      //> res6: scalaz.OptionT[Exercises.monadtxns.Error,Int] = OptionT(-\/(Oh,shit!))

与重新构建另一个类型不同的是,通过Monad Transformer叠加Monad组合形成类型的操作依然使用各组成Monad的操作函数,这些函数运算结果类型任然是对应的Monad类型,所以需要一些升格函数(lifting functions)来统一类型。而重建类型则继承了组成Monad的操作函数,它们的运算结果类型都与新建的这个类型一致。下面我们还是用上面的这个Either+Option例子来示范。我们把Either和Option叠加后按照不同顺序可以产生Either[Option[A]]或者Option[Either[A]]两种结果类型,所以叠加顺序是非常重要的,因为这两种类型代表着截然不同的意义:Either[Option[A]]代表一个运算结果可以是成功right或者失败left,如果运算成功则返回一个结果或空值;而Option[Either[A]]从字面上理解好像是一个运算可以返回一个成功或失败的运算又或者返回空值,应该是没有任何意义的一个类型。前面我们提到过用Monad Transformer叠加Monad是由内向外反方向的:获取Either[Option[A]]就需要用OptionT[Either,A]。而且我们需要把Either和Option升格成OptionT[Either,A],看下面的示范:

 1 type Error[A] = \/[String, A]
 2 type Result[A] = OptionT[Error, A]
 3 
 4 def getString: Option[String] = "Hello ".some     //> getString: => Option[String]
 5 def getResult: Error[String] = "how are you!".right
 6                                                   //> getResult: => Exercises.monadtxns.Error[String]
 7 val prg: Result[String] = for {
 8   s1 <- OptionT.optionT(getString.point[Error])
 9   s2 <- "World,".point[Result]
10   s3 <- getResult.liftM[OptionT]
11 } yield s1+s2+s3                                  //> prg  : Exercises.monadtxns.Result[String] = OptionT(\/-(Some(Hello World,how
12                                                   //|  are you!)))
13 prg.run                                           //> res0: Exercises.monadtxns.Error[Option[String]] = \/-(Some(Hello World,how a
14                                                   //| re you!))

首先,我们避免了stair-stepping,直接运算s1+s2+s3。point、OptionT.optionT、liftM分别对String,Option,Either进行类型升格形成Result[String] >>> OptionT[Error,String]。升格函数源代码如下:

trait ApplicativeIdV[A] extends Ops[A] {
    def point(implicit F: Applicative[F]): F[A] = Applicative[F].point(self)
...
trait OptionTFunctions {
  def optionT[M[_]] = new (({type λ[α] = M[Option[α]]})#λ ~> ({type λ[α] = OptionT[M, α]})#λ) {
    def apply[A](a: M[Option[A]]) = new OptionT[M, A](a)
  }
...
final class MonadOps[F[_],A] private[syntax](val self: F[A])(implicit val F: Monad[F]) extends Ops[F[A]] {
  ////

  def liftM[G[_[_], _]](implicit G: MonadTrans[G]): G[F, A] = G.liftM(self)
...

再看看组合的Monad是否实现了功能叠加,如果我们加个None转换:

1 val prg: Result[String] = for {
2   s1 <- OptionT.optionT(getString.point[Error])
3   s0 <- OptionT(none[String].point[Error])  
4   s2 <- "World,".point[Result]
5   s3 <- getResult.liftM[OptionT]
6 } yield s1+s2+s3                                  //> prg  : Exercises.monadtxns.Result[String] = OptionT(\/-(None))
7 prg.run                                           //> res0: Exercises.monadtxns.Error[Option[String]] = \/-(None)

加个Left效果:

1 val prg: Result[String] = for {
2   s1 <- OptionT.optionT(getString.point[Error])
3   s0 <- OptionT("Catch Error!".left: Error[Option[String]])
4   s2 <- "World,".point[Result]
5   s3 <- getResult.liftM[OptionT]
6 } yield s1+s2+s3                                  //> prg  : Exercises.monadtxns.Result[String] = OptionT(-\/(Catch Error!))
7 prg.run                                           //> res0: Exercises.monadtxns.Error[Option[String]] = -\/(Catch Error!)

的确,用Monad Transformer组合Monad后可以实现成员Monad的效果叠加。

不过,在实际应用中两层以上的Monad组合还是比较普遍的。Monad Transformer本身就是Monad,可以继续与另一个Monad组合,只要用这个Monad的Transformer就行了。例如我们在上面的例子里再增加一层State,最终形成一个三层类型:State[Either[Option[A]]]。按照上面的经验,堆砌Monad是由内向外的,我们先组合 StateEither >>> StateT[Either,A],然后再得出组合:OptionT[StateEither,A]。我们来示范一下:

先重新命名(alias)一些类:

type StringEither[A] = String \/ A
type StringEitherT[M[_],A] = EitherT[M,String,A]
type IntState[A] = State[Int,A]
type IntStateT[M[_],A] = StateT[M,Int,A]
type StateEither[A] = StringEitherT[IntState,A]
type StateEitherOption[A] = OptionT[StateEither,A]

由Option,Either,State组合而成的Monad需要相关的升格函数(lifting functions):

//常量升格
val m: StateEitherOption[Int] = 3.point[StateEitherOption]
                                                  //> m  : Exercises.monad_txnfm.StateEitherOption[Int] = OptionT(EitherT(scalaz.p
                                                  //| ackage$StateT$$anon$1@4f638935))
//option类升格
val o: Option[Int] = 3.some                       //> o  : Option[Int] = Some(3)
val o1: StateEither[Option[Int]]= o.point[StateEither]
                                                  //> o1  : Exercises.monad_txnfm.StateEither[Option[Int]] = EitherT(scalaz.packag
                                                  //| e$StateT$$anon$1@694abbdc)

val o2: StateEitherOption[Int] = OptionT.optionT(o1)
                                                  //> o2  : Exercises.monad_txnfm.StateEitherOption[Int] = OptionT(EitherT(scalaz.
                                                  //| package$StateT$$anon$1@694abbdc))
//val o2: OptionT[StateEither,Int] = OptionT.optionT(o1)

//either类升格
val e: StringEither[Int] = 3.point[StringEither]  //> e  : Exercises.monad_txnfm.StringEither[Int] = \/-(3)
val e1: IntState[StringEither[Int]] = e.point[IntState]
                                                  //> e1  : Exercises.monad_txnfm.IntState[Exercises.monad_txnfm.StringEither[Int]
                                                  //| ] = scalaz.package$StateT$$anon$1@52bf72b5
val e2: StateEither[Int] = EitherT.eitherT(e1)    //> e2  : Exercises.monad_txnfm.StateEither[Int] = EitherT(scalaz.package$StateT
                                                  //| $$anon$1@52bf72b5)
//val e2: StringEitherT[IntState,Int] = EitherT.eitherT(e1)
val e3: StateEitherOption[Int] = e2.liftM[OptionT]//> e3  : Exercises.monad_txnfm.StateEitherOption[Int] = OptionT(EitherT(scalaz.
                                                  //| IndexedStateT$$anon$10@2d7275fc))
//val e3: OptionT[StateEither,Int] = e2.liftM[OptionT]
//state类升格
val s: IntState[Int] = get[Int]                   //> s  : Exercises.monad_txnfm.IntState[Int] = scalaz.package$State$$anon$3@7e0
                                                  //| 7db1f
val s1: StateEither[Int] = s.liftM[StringEitherT] //> s1  : Exercises.monad_txnfm.StateEither[Int] = EitherT(scalaz.IndexedStateT
                                                  //| $$anon$10@8f4ea7c)
//val s1: StringEitherT[IntState,Int] = s.liftM[StringEitherT]
val s2: StateEitherOption[Int] = s1.liftM[OptionT]//> s2  : Exercises.monad_txnfm.StateEitherOption[Int] = OptionT(EitherT(scalaz
                                                  //| .IndexedStateT$$anon$10@436813f3))
//val s2: OptionT[StateEither,Int] = s1.liftM[OptionT]
//把State升格成StateT
val s3: IntStateT[StringEither,Int] = get[Int].lift[StringEither]
                                                  //> s3  : Exercises.monad_txnfm.IntStateT[Exercises.monad_txnfm.StringEither,In
                                                  //| t] = scalaz.IndexedStateT$$anon$7@10e31a9a

上面又多介绍了StateT.lift, EitherT.eitherT两个升格函数:

  def lift[M[_]: Applicative]: IndexedStateT[({type λ[α]=M[F[α]]})#λ, S1, S2, A] = new IndexedStateT[({type λ[α]=M[F[α]]})#λ, S1, S2, A] {
    def apply(initial: S1): M[F[(S2, A)]] = Applicative[M].point(self(initial))
  }
...
trait EitherTFunctions {
  def eitherT[F[_], A, B](a: F[A \/ B]): EitherT[F, A, B] = EitherT[F, A, B](a)
...

我们在上面例子的基础上增加一层State效果后再试用一下这些升格函数:

 1 def getString: Option[String] = "Hello ".some     //> getString: => Option[String]
 2 def getResult: StringEither[String] = "how are you!".right[String]
 3                                                   //> getResult: => Exercises.monad_txnfm.StringEither[String]
 4 def modState(s:Int): IntState[Unit] = put(s)      //> modState: (s: Int)Exercises.monad_txnfm.IntState[Unit]
 5 val prg: StateEitherOption[String] = for {
 6   s1 <- OptionT.optionT(getString.point[StateEither])
 7   s2 <- "World,".point[StateEitherOption]
 8   s3 <- (EitherT.eitherT(getResult.point[IntState]): StateEither[String]).liftM[OptionT]
 9   _ <- (modState(99).liftM[StringEitherT]: StateEither[Unit]).liftM[OptionT]
10 } yield s1+s2+s3                                  //> prg  : Exercises.monad_txnfm.StateEitherOption[String] = OptionT(EitherT(sc
11                                                   //| alaz.IndexedStateT$$anon$10@158d2680))
12 prg.run                                           //> res0: Exercises.monad_txnfm.StateEither[Option[String]] = EitherT(scalaz.In
13                                                   //| dexedStateT$$anon$10@158d2680)

不错,类型对了,prg可以通过编译,但未免复杂了点。我花了许多时间去匹配这些类型,因为需要连续升格。可想而知,如果遇到四层以上的Monad组合,代码会复杂成怎样。其中重点还是在各种类型的升格。那我们还是回顾一下这些升格函数吧:

A.point[F[_]] >>> F[A]   "hi".point[Option] = Option[String] = Some("hi")

M[A].liftM[T[_[_],_]] >>> T[M,A]   List(3).liftM[OptionT] = OptionT[List,Int] = OptionT(List(Some(3)))

OptionT.optionT(M[Option[A]]) >>> OptionT[M,A]  OptionT.optionT(List(3.some)) = OptionT[List,Int] = OptionT(List(Some(3)

EitherT.eitherT(M[Either[A]]) >>> EitherT[M,A] EitherT.eitherT(List(3.right[String])) = EitherT(List(\/-(3))

State.lift[M[A]] >>> StateT[M,A]  get[Int].lift[Option] = StateT[Option,Int]

注意:以上采用了形象类型表述

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏深度学习入门与实践

【原】Learning Spark (Python版) 学习笔记(二)----键值对、数据读取与保存、共享特性

  本来应该上周更新的,结果碰上五一,懒癌发作,就推迟了 = =。以后还是要按时完成任务。废话不多说,第四章-第六章主要讲了三个内容:键值对、数据读取与保存与S...

2988
来自专栏Spark生态圈

[spark] RDD缓存源码解析

我们可以利用不同的存储级别存储每一个被持久化的RDD。可以存储在内存中,也可以序列化后存储在磁盘上等方式。Spark也会自动持久化一些shuffle操作(如re...

773
来自专栏美图数据技术团队

Spark Streaming | Spark,从入门到精通

欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,...

421
来自专栏伦少的博客

SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

1526
来自专栏函数式编程语言及工具

Scalaz(12)- Monad:再述述flatMap,顺便了解MonadPlus

  在前面的几篇讨论里我们初步对FP有了些少了解:FP嘛,不就是F[A]吗?也是,FP就是在F[]壳子(context)内对程序的状态进行更改,也就是在F壳子(...

1767
来自专栏祝威廉

Spark 2.0 Structured Streaming 分析

Spark 2.0 将流式计算也统一到DataFrame里去了,提出了Structured Streaming的概念,将数据源映射为一张无线长度的表,同时将流式...

763
来自专栏nimomeng的自我进阶

Event官方文档

当系统传递一个touch event,首先会send到一个特定的view。对于touch view来讲,这个view就是被hitTest:withEve...

852
来自专栏岑玉海

Spark源码系列(三)作业运行过程

作业执行 上一章讲了RDD的转换,但是没讲作业的运行,它和Driver Program的关系是啥,和RDD的关系是啥? 官方给的例子里面,一执行collect方...

2714
来自专栏岑玉海

Spark源码系列(六)Shuffle的过程解析

Spark大会上,所有的演讲嘉宾都认为shuffle是最影响性能的地方,但是又无可奈何。之前去百度面试hadoop的时候,也被问到了这个问题,直接回答了不知道。...

3626
来自专栏猿天地

Netty-整合kryo高性能数据传输

前言 本篇文章是Netty专题的第三篇,前面2篇文章如下: 高性能NIO框架Netty入门篇 高性能NIO框架Netty-对象传输 Netty 是 开源的基于j...

57212

扫码关注云+社区