Scalaz(37)- Free :实践-DB Transaction free style

  我一直在不断的提示大家:FP就是Monadic Programming,是一种特殊的编程风格。在我们熟悉的数据库编程领域能不能实现FP风格呢?我们先设计一些示范例子来分析一下惯用的数据库编程过程:

 1 import scalaz._
 2 import Scalaz._
 3 import scala.language.higherKinds
 4 import scala.language.implicitConversions
 5 import com.jolbox.bonecp.BoneCP
 6 import com.jolbox.bonecp.BoneCPConfig
 7 import java.sql.Connection
 8 import java.sql.ResultSet
 9 
10 object freedbtxns {
11 def getTutorId(courseId: Int, conn: Connection): Int = {
12   val sqlString = "select TUTOR from COURSES where ID=" + courseId
13   conn.createStatement().executeQuery(sqlString).getInt("ID")
14 }  
15 def getTutorPay(courseId: Int, conn: Connection): Double = {
16   val sqlString = "select PAYAMT from COURSES where ID=" + courseId
17   conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
18 }  
19 def getStudentFee(courseId: Int, conn: Connection): Double = {
20   val sqlString = "select FEEAMT from COURSES where ID=" + courseId
21   conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
22 }  
23 def updateTutorPay(tutorId: Int, plusAmt: Double, conn: Connection): Unit = {
24   val sqlString = "update TUTORS set PAYABLE = PAYABLE+"+plusAmt.toString + " where ID=" + tutorId
25   conn.createStatement().executeUpdate(sqlString)
26 }  
27 def updateStudentFee(studentId: Int, plusAmt: Double, conn: Connection): Unit = {
28   val sqlString = "update STUDENTS set DUEAMT = DUEAMT+"+plusAmt.toString + " where ID=" + studentId
29   conn.createStatement().executeUpdate(sqlString)
30 }  
31 def findEmptySeat(courseId: Int, conn: Connection): Int = {
32   val sqlString = "select ID from SEATS where OCCUPIED='T' AND ID=" + courseId
33   conn.createStatement().executeQuery(sqlString).getInt("ID")
34 }
35 def updateSeatsStatus(seatId: Int, taken: Boolean, conn: Connection): Unit = {
36   val sqlString = "update SEATS set OCCUPIED ='"+taken.toString.toUpperCase.head + "' where ID=" + seatId
37   conn.createStatement().executeUpdate(sqlString)
38 }  

我这里模拟了一个培训学校内的一些业务。上面设计的是一些基本函数,可以分别对学员、导师、座位进行查询和更新。如果我们需要把更新工作放入事务处理(transaction)内的话我们可以这样做:

 1 def updateStudent(studentId: Int, courseId: Int): Unit = {
 2    val config = new BoneCPConfig()
 3    val bonecp = new BoneCP(config)
 4    val conn = bonecp.getConnection()
 5    conn.setReadOnly(false)
 6    conn.setAutoCommit(false)
 7    conn.rollback()
 8    try {
 9      val fee = getStudentFee(courseId, conn)
10      updateStudentFee(studentId,fee, conn)
11      conn.commit()
12    } catch {
13      case (e:Exception) => conn.rollback()
14    } finally {
15      conn.close()
16    }   
17 }
18 def updateStudentAndSeat(studentId: Int, courseId: Int): Unit = {
19    val config = new BoneCPConfig()
20    val bonecp = new BoneCP(config)
21    val conn = bonecp.getConnection()
22    conn.setReadOnly(false)
23    conn.setAutoCommit(false)
24    conn.rollback()
25    try {
26      val fee = getStudentFee(courseId, conn)
27      updateStudentFee(studentId,fee, conn)
28      val seatId = findEmptySeat(courseId, conn)
29      updateSeatsStatus(seatId, true, conn)
30      conn.commit()
31    } catch {
32      case (e:Exception) => conn.rollback()
33    } finally {
34      conn.close()
35    }   
36 }

马上可以发现在我们对这些函数在事务处理内进行组合使用时我们必须重新对事务处理进行设置,无法实现真正意义上的函数组合。如果我们认可FP风格的话,这里起码有两项弊处:一是源代码增加了大量的铺垫(boilerplate code),重复事务处理设置、二是每个更新函数都会产生副作用,换句话说就是这里那里都会有副作用影响,很难控制,这样就增加了程序的复杂程度,造成代码分析的困难。

我们希望达到的目标:

 1 /*
 2 def updateStudentAndSeat(studentId: Int): program {
 3  // findEmptySeat
 4  // updateStudentFee
 5  // updateSeatStatus 
 6 }
 7 
 8 def runDBTxn(prg: program) {
 9   //conn= getConnection
10   //try
11   // run(pre)
12   //commit
13   //catch
14   //rollback
15 }
16 runDBTxn(updateStudent)
17 runDBTxn(updateStudentAndSeat)
18 runDBTxn(updateSeatStatus)
19 */  

我们只在一个地方设置和运行事务处理。我们希望能把不同的program传入runDBTxn去运算。这不就是Free Monad的编程、运算关注分离模式嘛。那我们就试着用Free Monad来提供数据库事务处理支持。按上篇讨论的设计流程我们先设计ADT:

1 case class SqlOp[A](run: Connection => A)

模拟sql指令很简单,两种情况:query或者update。两者都可以用函数run表示:传入Connection,返回结果A,A有可能是Unit。要成为Free Monad就必须先获取SqlOp的Functor实例:

1 case class SqlOp[A](run: Connection => A)
2 implicit val sqlOpFunctor = new Functor[SqlOp] {
3   def map[A,B](sa: SqlOp[A])(f: A => B): SqlOp[B] = 
4     SqlOp{ (conn: Connection) => f(sa.run(conn)) }
5 }

基本功能的sql操作函数及升格Free:

 1 type Sql[A] = Free[SqlOp,A]
 2 def getTutorId(courseId: Int): Sql[Int] = 
 3   Free.liftF(SqlOp{
 4     (conn: Connection) => {
 5       val sqlString = "select TUTOR from COURSES where ID=" + courseId
 6       conn.createStatement().executeQuery(sqlString).getInt("ID")
 7     }  
 8   })
 9   
10 def getTutorPay(courseId: Int): Sql[Double] = 
11   Free.liftF(SqlOp{
12     (conn: Connection) => {
13       val sqlString = "select PAYAMT from COURSES where ID=" + courseId
14       conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
15     }  
16   })
17 def getStudentFee(courseId: Int): Sql[Double] = 
18   Free.liftF(SqlOp{
19     (conn: Connection) => {
20      val sqlString = "select FEEAMT from COURSES where ID=" + courseId
21      conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
22     }  
23   })
24 def updateTutorPay(tutorId: Int, plusAmt: Double): Sql[Unit] = 
25   Free.liftF(SqlOp{
26     (conn: Connection) => {
27       val sqlString = "update TUTORS set PAYABLE = PAYABLE+"+plusAmt.toString + " where ID=" + tutorId
28       conn.createStatement().executeUpdate(sqlString)
29     }  
30   })
31 def updateStudentFee(studentId: Int, plusAmt: Double): Sql[Unit] = 
32   Free.liftF(SqlOp{
33     (conn: Connection) => {
34       val sqlString = "update STUDENTS set DUEAMT = DUEAMT+"+plusAmt.toString + " where ID=" + studentId
35       conn.createStatement().executeUpdate(sqlString)
36     }  
37   })
38 def findEmptySeat(courseId: Int): Sql[Int] = 
39   Free.liftF(SqlOp{
40     (conn: Connection) => {
41       val sqlString = "select ID from SEATS where OCCUPIED='T' AND ID=" + courseId
42       conn.createStatement().executeQuery(sqlString).getInt("ID")
43     }  
44   })
45 def updateSeatsStatus(seatId: Int, taken: Boolean): Sql[Unit] = 
46   Free.liftF(SqlOp{
47     (conn: Connection) => {
48       val sqlString = "update SEATS set OCCUPIED ='"+taken.toString.toUpperCase.head + "' where ID=" + seatId
49       conn.createStatement().executeUpdate(sqlString)
50     }  
51   })

我们现在可以用这些升格成Free的函数来建设AST示范例子:

 1   def takeSeat(courseId: Int): Sql[Unit] = for {
 2     emptySeat <- findEmptySeat(courseId)
 3     _ <- updateSeatsStatus(emptySeat, true)
 4   } yield()
 5   def addCourse(studentId: Int, courseId: Int): Sql[Unit] = for {
 6     fee <- getStudentFee(courseId)
 7     pay <- getTutorPay(courseId)
 8     tutorId <- getTutorId(courseId)
 9     _ <- updateStudentFee(studentId, fee)
10     _ <- updateTutorPay(tutorId, pay)
11     _ <- takeSeat(courseId)
12   } yield()

addCourse对基本函数进行了组合,又调用了已经组合过一次的takeSeat,证明AST可以实现高度的函数组合。

下面示范实现相关的Interpreter:

1   def runTransactionImpl[A](conn: Connection, ast: Sql[A]): A = 
2     ast.resume.fold ({
3       case x: SqlOp[Sql[A]] => runTransactionImpl(conn, x.run(conn))
4     },
5     (a: A) => a 
6    )

我们需要一个通用的事务处理方法:

 1   def runTransaction[A](ast: Sql[A]): Exception \/ A = {
 2     val config = new BoneCPConfig()
 3     val bonecp = new BoneCP(config)
 4     val conn = bonecp.getConnection()
 5     conn.setReadOnly(false)
 6     conn.setAutoCommit(false)
 7     conn.rollback()
 8     try {
 9       val result: A = runTransactionImpl(conn, ast)
10       result.right[Exception]
11     } catch {
12       case e: Exception => e.left[A]
13     } finally {
14       conn.close
15     } 
16   }

这样,我们可以在一个地方使用事务处理来运算任何事先设计的AST。

我们可以用不同的方法来实现Interpreter。下面就是用Free.foldMap来运算AST的示范。由于我们需要注入Connection,所以采用了Sql to State的自然转换(natural transformation):

 1   type SqlState[A] = State[Connection, A]
 2   object SqlToState extends (SqlOp ~> SqlState) {
 3     def apply[A](sa: SqlOp[A]): SqlState[A] = sa match {
 4       case SqlOp(f) => State {
 5         conn => (conn,f(conn))
 6       }
 7     }
 8   }
 9   def runTransactionImplState[A](conn: Connection, ast: Sql[A]) =
10     ast.foldMap(SqlToState).run(conn)

下面是这个用Free来实现FP风格数据库事务处理的完整示范代码:

  1 import scalaz._
  2 import Scalaz._
  3 import scala.language.higherKinds
  4 import scala.language.implicitConversions
  5 import com.jolbox.bonecp.BoneCP
  6 import com.jolbox.bonecp.BoneCPConfig
  7 import java.sql.Connection
  8 import java.sql.ResultSet
  9 
 10 object freedbtxns {
 11 
 12 case class SqlOp[A](run: Connection => A)
 13 implicit val sqlOpFunctor = new Functor[SqlOp] {
 14   def map[A,B](sa: SqlOp[A])(f: A => B): SqlOp[B] = 
 15     SqlOp{ (conn: Connection) => f(sa.run(conn)) }
 16 }
 17 type Sql[A] = Free[SqlOp,A]
 18 def getTutorId(courseId: Int): Sql[Int] = 
 19   Free.liftF(SqlOp{
 20     (conn: Connection) => {
 21       val sqlString = "select TUTOR from COURSES where ID=" + courseId
 22       conn.createStatement().executeQuery(sqlString).getInt("ID")
 23     }  
 24   })
 25   
 26 def getTutorPay(courseId: Int): Sql[Double] = 
 27   Free.liftF(SqlOp{
 28     (conn: Connection) => {
 29       val sqlString = "select PAYAMT from COURSES where ID=" + courseId
 30       conn.createStatement().executeQuery(sqlString).getDouble("PAYAMT")
 31     }  
 32   })
 33 def getStudentFee(courseId: Int): Sql[Double] = 
 34   Free.liftF(SqlOp{
 35     (conn: Connection) => {
 36      val sqlString = "select FEEAMT from COURSES where ID=" + courseId
 37      conn.createStatement().executeQuery(sqlString).getDouble("FEEAMT")
 38     }  
 39   })
 40 def updateTutorPay(tutorId: Int, plusAmt: Double): Sql[Unit] = 
 41   Free.liftF(SqlOp{
 42     (conn: Connection) => {
 43       val sqlString = "update TUTORS set PAYABLE = PAYABLE+"+plusAmt.toString + " where ID=" + tutorId
 44       conn.createStatement().executeUpdate(sqlString)
 45     }  
 46   })
 47 def updateStudentFee(studentId: Int, plusAmt: Double): Sql[Unit] = 
 48   Free.liftF(SqlOp{
 49     (conn: Connection) => {
 50       val sqlString = "update STUDENTS set DUEAMT = DUEAMT+"+plusAmt.toString + " where ID=" + studentId
 51       conn.createStatement().executeUpdate(sqlString)
 52     }  
 53   })
 54 def findEmptySeat(courseId: Int): Sql[Int] = 
 55   Free.liftF(SqlOp{
 56     (conn: Connection) => {
 57       val sqlString = "select ID from SEATS where OCCUPIED='T' AND ID=" + courseId
 58       conn.createStatement().executeQuery(sqlString).getInt("ID")
 59     }  
 60   })
 61 def updateSeatsStatus(seatId: Int, taken: Boolean): Sql[Unit] = 
 62   Free.liftF(SqlOp{
 63     (conn: Connection) => {
 64       val sqlString = "update SEATS set OCCUPIED ='"+taken.toString.toUpperCase.head + "' where ID=" + seatId
 65       conn.createStatement().executeUpdate(sqlString)
 66     }  
 67   })
 68 
 69   def takeSeat(courseId: Int): Sql[Unit] = for {
 70     emptySeat <- findEmptySeat(courseId)
 71     _ <- updateSeatsStatus(emptySeat, true)
 72   } yield()
 73   def addCourse(studentId: Int, courseId: Int): Sql[Unit] = for {
 74     fee <- getStudentFee(courseId)
 75     pay <- getTutorPay(courseId)
 76     tutorId <- getTutorId(courseId)
 77     _ <- updateStudentFee(studentId, fee)
 78     _ <- updateTutorPay(tutorId, pay)
 79     _ <- takeSeat(courseId)
 80   } yield()
 81 
 82   def runTransactionImpl[A](conn: Connection, ast: Sql[A]): A = 
 83     ast.resume.fold ({
 84       case x: SqlOp[Sql[A]] => runTransactionImpl(conn, x.run(conn))
 85     },
 86     (a: A) => a 
 87    )
 88   def runTransaction[A](ast: Sql[A]): Exception \/ A = {
 89     val config = new BoneCPConfig()
 90     val bonecp = new BoneCP(config)
 91     val conn = bonecp.getConnection()
 92     conn.setReadOnly(false)
 93     conn.setAutoCommit(false)
 94     conn.rollback()
 95     try {
 96       val result: A = runTransactionImpl(conn, ast)
 97       result.right[Exception]
 98     } catch {
 99       case e: Exception => e.left[A]
100     } finally {
101       conn.close
102     } 
103   }
104 }

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏JAVA技术站

Sharding-Jdbc之读写分离导读 原

      Sharding-JDBC是一个开源的分布式数据库中间件,它无需额外部署和依赖,完全兼容JDBC和各种ORM框架。Sharding-JDBC作为面向...

1403
来自专栏杨建荣的学习笔记

pl/sql中的参数模式(r4笔记第54天)

在平时的工作中,可能通过pl/sql传入参数来做一些特定的操作,参数模式一般有In,out.in out这几种 比如dbms_sqltune下的PREPARE_...

2994
来自专栏Java编程技术

SpringBoot使用及原理浅尝

最近微服务很热,而SpringBoot以轻量级和内嵌tomcat,方便启动调试为微服务越来越被采用,而现在前沿的技术的demo一般都也使用SpringBoot编...

822
来自专栏乐沙弥的世界

PL/SQL --> 异常处理(Exception)

Exception是一种PL/SQL标识符,当运行的PL/SQL块出现错误或警告,则会触发异常处理。为了提高程序的健壮性,可以在PL/SQL块中引

641
来自专栏一个会写诗的程序员的博客

《Spring Boot极简教程》 第6章 Springboot数据库集成

在SpringBoot集成MyBatis时,我们将去掉和Mybatis配置相关的xml文件配置方式,用注解和属性配置来代替这些繁杂的配置。

581
来自专栏解Bug之路

MySql之自动生成CRUD代码

MyBatis能够通过获取MySql中的information_schema从而获取表的字段等信息,最后通过这些信息生成代码。 笔者受此启发,将MyBatis...

733
来自专栏后端之路

SpringBoot实现Mybatis多数据源方案

背景 目前报表导出需要多数据库的数据,因此我们需要做Mybatis多数据源的配置 我们之前使用Spring的AbstractRoutingDataSource ...

5667
来自专栏杂烩

hadoop MapReduce编写一个分组统计并排序查询-分组

说一下需求,有一张销售统计表,记录每个销售员每天的销售情况,现在要统计出某一月的每个销售员的销售情况并且按照销售额从高往低排序(hadoop默认是升序)。

752
来自专栏恰同学骚年

轻量级ORM框架初探-Dapper与PetaPoco的基本使用

  EF是传统的ORM框架,也是一个比较重量级的ORM框架。这里仍然使用EF的原因在于为了突出轻量级ORM框架的性能,所谓有对比才有更优的选择。

1103
来自专栏小筱月

java ssm框架实现分页功能 (oracle)

LIMIT a,b : 参数 a:第 a 条数据开始查询(不包括第 a 条), 参数 b:查询 b 条数据

732

扫码关注云+社区