前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SDP(4):ScalikeJDBC- JDBC-Engine:Updating

SDP(4):ScalikeJDBC- JDBC-Engine:Updating

作者头像
用户1150956
发布2018-03-16 16:38:22
1.4K0
发布2018-03-16 16:38:22
举报

    在上一篇博文里我们把JDBC-Engine的读取操作部分分离出来进行了讨论,在这篇准备把更新Update部分功能介绍一下。当然,JDBC-Engine的功能是基于ScalikeJDBC的,所有的操作和属性都包嵌在SQL这个类型中:

/**
 * SQL abstraction.
 *
 * @param statement SQL template
 * @param rawParameters parameters
 * @param f  extractor function
 * @tparam A return type
 */
abstract class SQL[A, E <: WithExtractor](
  val statement: String,
  private[scalikejdbc] val rawParameters: Seq[Any]
)(f: WrappedResultSet => A)
{...}

Update功能置于下面这几个子类中:

/**
 * SQL which execute java.sql.Statement#executeUpdate().
 *
 * @param statement SQL template
 * @param parameters parameters
 * @param before before filter
 * @param after after filter
 */
class SQLUpdate(val statement: String, val parameters: Seq[Any], val tags: Seq[String] = Nil)(
    val before: (PreparedStatement) => Unit
)(
    val after: (PreparedStatement) => Unit
) {

  def apply()(implicit session: DBSession): Int = {
    val attributesSwitcher = new DBSessionAttributesSwitcher(SQL("").tags(tags: _*))
    session match {
      case AutoSession =>
        DB.autoCommit(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))
      case NamedAutoSession(name, _) =>
        NamedDB(name, session.settings).autoCommit(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))
      case ReadOnlyAutoSession =>
        DB.readOnly(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))
      case ReadOnlyNamedAutoSession(name, _) =>
        NamedDB(name, session.settings).readOnly(DBSessionWrapper(_, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*))
      case _ =>
        DBSessionWrapper(session, attributesSwitcher).updateWithFilters(before, after, statement, parameters: _*)
    }
  }

}

/**
 * SQL which execute java.sql.Statement#execute().
 *
 * @param statement SQL template
 * @param parameters parameters
 * @param before before filter
 * @param after after filter
 */
class SQLExecution(val statement: String, val parameters: Seq[Any], val tags: Seq[String] = Nil)(
    val before: (PreparedStatement) => Unit
)(
    val after: (PreparedStatement) => Unit
) {

  def apply()(implicit session: DBSession): Boolean = {
    val attributesSwitcher = new DBSessionAttributesSwitcher(SQL("").tags(tags: _*))
    val f: DBSession => Boolean = DBSessionWrapper(_, attributesSwitcher).executeWithFilters(before, after, statement, parameters: _*)
    // format: OFF
    session match {
      case AutoSession                       => DB.autoCommit(f)
      case NamedAutoSession(name, _)         => NamedDB(name, session.settings).autoCommit(f)
      case ReadOnlyAutoSession               => DB.readOnly(f)
      case ReadOnlyNamedAutoSession(name, _) => NamedDB(name, session.settings).readOnly(f)
      case _                                 => f(session)
    }
    // format: ON
  }

}
/**
 * SQL which execute java.sql.Statement#executeBatch().
 *
 * @param statement SQL template
 * @param parameters parameters
 */
class SQLBatch(val statement: String, val parameters: Seq[Seq[Any]], val tags: Seq[String] = Nil) {

  def apply[C[_]]()(implicit session: DBSession, cbf: CanBuildFrom[Nothing, Int, C[Int]]): C[Int] = {
    val attributesSwitcher = new DBSessionAttributesSwitcher(SQL("").tags(tags: _*))
    val f: DBSession => C[Int] = DBSessionWrapper(_, attributesSwitcher).batch(statement, parameters: _*)
    // format: OFF
    session match {
      case AutoSession                       => DB.autoCommit(f)
      case NamedAutoSession(name, _)         => NamedDB(name, session.settings).autoCommit(f)
      case ReadOnlyAutoSession               => DB.readOnly(f)
      case ReadOnlyNamedAutoSession(name, _) => NamedDB(name, session.settings).readOnly(f)
      case _                                 => f(session)
    }
    // format: ON
  }

}

按照JDBC-Engine的功能设计要求,我们大约把Update功能分成数据表构建操作DDL、批次运算Batch、和普通Update几种类型。我们是通过JDBCContext来定义具体的Update功能类型:

object JDBCContext {
    type SQLTYPE = Int
    val SQL_SELECT: Int = 0
    val SQL_EXEDDL= 1
    val SQL_UPDATE = 2
    val RETURN_GENERATED_KEYVALUE = true
    val RETURN_UPDATED_COUNT = false

  }

  case class JDBCContext(
                          dbName: Symbol,
                          statements: Seq[String] = Nil,
                          parameters: Seq[Seq[Any]] = Nil,
                          fetchSize: Int = 100,
                          queryTimeout: Option[Int] = None,
                          queryTags: Seq[String] = Nil,
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
                          batch: Boolean = false,
                          returnGeneratedKey: Seq[Option[Any]] = Nil,
                          // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None,
                          postAction: Option[PreparedStatement => Unit] = None) {

    ctx =>

    //helper functions

    def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)

    def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)

    def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)

    def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)

    def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
        !ctx.batch && ctx.statements.size == 1)
        ctx.copy(preAction = action)
      else
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
    }

    def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
        !ctx.batch && ctx.statements.size == 1)
        ctx.copy(postAction = action)
      else
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
    }

    def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
        ctx.copy(
          statements = ctx.statements ++ Seq(_statement),
          parameters = ctx.parameters ++ Seq(Seq(_parameters))
        )
      } else
        throw new IllegalStateException("JDBCContex setting error: option not supported!")
    }

    def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
        ctx.copy(
          statements = ctx.statements ++ Seq(_statement),
          parameters = ctx.parameters ++ Seq(_parameters),
          returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
        )
      } else
        throw new IllegalStateException("JDBCContex setting error: option not supported!")
    }

    def appendBatchParameters(_parameters: Any*): JDBCContext = {
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
        throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")

      var matchParams = true
      if (ctx.parameters != Nil)
        if (ctx.parameters.head.size != _parameters.size)
          matchParams = false
      if (matchParams) {
        ctx.copy(
          parameters = ctx.parameters ++ Seq(_parameters)
        )
      } else
        throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
    }

    def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
         throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
      ctx.copy(
        returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
      )
    }

     def setQueryCommand(_statement: String, _parameters: Any*): JDBCContext = {
        ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          sqlType = JDBCContext.SQL_SELECT,
          batch = false
        )
      }

      def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
        ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          sqlType = JDBCContext.SQL_EXEDDL,
          batch = false
        )
      }

      def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
        ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
          sqlType = JDBCContext.SQL_UPDATE,
          batch = false
        )
      }
      def setBatchCommand(_statement: String): JDBCContext = {
        ctx.copy (
          statements = Seq(_statement),
          sqlType = JDBCContext.SQL_UPDATE,
          batch = true
        )
      }
  }

JDBCContext还提供了不少的Helper函数来协助构建特别功能的JDBCContext对象,如:setQueryCommand, setDDLCommand, setUpdateCommand, setBatchCommand。这些Helper函数提供Update功能定义的几个主要元素包括:SQL语句主体包括参数占位的statement、输入参数parameter、是否需要返回系统自动产生的主键returnGeneratedKey。在ScalikeJDBC中所有类型的Update功能可以用下面几类内部函数实现,包括:

  private[this] def batchInternal[C[_], A](
    template: String,
    paramsList: Seq[Seq[Any]],
    execute: StatementExecutor => scala.Array[A]
  )(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
    ensureNotReadOnlySession(template)
    paramsList match {
      case Nil => Seq.empty[A].to[C]
      case _ =>
        using(createBatchStatementExecutor(
          conn = conn,
          template = template,
          returnGeneratedKeys = false,
          generatedKeyName = None
        )) { executor =>
          paramsList.foreach {
            params =>
              executor.bindParams(params)
              executor.addBatch()
          }
          execute(executor).to[C]
        }
    }
  }
  private[this] def updateWithFiltersInternal[A](
    returnGeneratedKeys: Boolean,
    before: (PreparedStatement) => Unit,
    after: (PreparedStatement) => Unit,
    template: String,
    execute: StatementExecutor => A,
    params: Seq[Any]
  ): A = {
    ensureNotReadOnlySession(template)
    using(createStatementExecutor(
      conn = conn,
      template = template,
      params = params,
      returnGeneratedKeys = returnGeneratedKeys
    )) {
      executor =>
        before(executor.underlying)
        val count = execute(executor)
        after(executor.underlying)
        count
    }
  }
  private[this] def updateWithAutoGeneratedKeyNameAndFiltersInternal[A](
    returnGeneratedKeys: Boolean,
    generatedKeyName: String,
    before: (PreparedStatement) => Unit,
    after: (PreparedStatement) => Unit,
    template: String,
    execute: StatementExecutor => A,
    params: Seq[Any]
  ): A = {
    ensureNotReadOnlySession(template)
    using(createStatementExecutor(
      conn = conn,
      template = template,
      params = params,
      returnGeneratedKeys = returnGeneratedKeys,
      generatedKeyName = Option(generatedKeyName)
    )) {
      executor =>
        before(executor.underlying)
        val count = execute(executor)
        after(executor.underlying)
        count
    }
  }

我们可以看到所有类型的Update都是通过构建StatementExecutor并按其属性进行运算来实现的:

/**
 * java.sql.Statement Executor.
 *
 * @param underlying preparedStatement
 * @param template SQL template
 * @param singleParams parameters for single execution (= not batch execution)
 * @param isBatch is batch flag
 */
case class StatementExecutor(
    underlying: PreparedStatement,
    template: String,
    connectionAttributes: DBConnectionAttributes,
    singleParams: Seq[Any] = Nil,
    tags: Seq[String] = Nil,
    isBatch: Boolean = false,
    settingsProvider: SettingsProvider = SettingsProvider.default
) extends LogSupport with UnixTimeInMillisConverterImplicits with AutoCloseable {...}

这个StatementExcutor类的属性和我们的JDBCContext属性很接近。好了,回到JDBC-Engine Update功能定义。首先是DDL功能:

 def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = {
       if (ctx.sqlType != SQL_EXEDDL) {
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
      }
      else {
        NamedDB(ctx.dbName) localTx { implicit session =>
          Try {
                ctx.statements.foreach { stm =>
                  val ddl = new SQLExecution(statement = stm, parameters = Nil)(
                    before = WrappedResultSet => {})(
                    after = WrappedResultSet => {})

                  ddl.apply()
              }
            "SQL_EXEDDL executed succesfully."
          }
        }
      }
    }

所有JDBC-Engine的Update功能都是一个事务处理Transaction中的多条更新语句。DDL语句不需要参数所以只需要提供statement就足够了。下面是这个函数的使用示范:

 ConfigDBsWithEnv("dev").setup('h2)
  ConfigDBsWithEnv("dev").loadGlobalSettings()

  val dropSQL: String ="""
      drop table members
    """

  val createSQL: String ="""
    create table members (
      id serial not null primary key,
      name varchar(30) not null,
      description varchar(1000),
      birthday date,
      created_at timestamp not null,
      picture blob
    )"""

  var ctx = JDBCContext('h2)
    try {
      ctx = ctx.setDDLCommand(dropSQL)
        .appendDDLCommand(createSQL)
    }
    catch {
       case e: Exception => println(e.getMessage)
    }

  val resultCreateTable = jdbcExcuteDDL(ctx)

  resultCreateTable match {
    case Success(msg) => println(msg)
    case Failure(err) => println(s"${err.getMessage}")
  }

在这里我们修改了上次使用的members表,增加了一个blob类的picture列。这个示范在一个完整的Transaction里包括了两条DDL语句。

批次更新batch-update是指多条输入参数在一条统一的statement上施用:

  def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      if (ctx.statements == Nil)
        throw new IllegalStateException("JDBCContex setting error: statements empty!")
      if (ctx.sqlType != SQL_UPDATE) {
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
      }
      else {
        if (ctx.batch) {
          if (noReturnKey(ctx)) {
            val usql = SQL(ctx.statements.head)
              .tags(ctx.queryTags: _*)
              .batch(ctx.parameters: _*)
            Try {
              NamedDB(ctx.dbName) localTx { implicit session =>
                ctx.queryTimeout.foreach(session.queryTimeout(_))
                usql.apply[Seq]()
                Seq.empty[Long].to[C]
              }
            }
          } else {
            val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
            Try {
              NamedDB(ctx.dbName) localTx { implicit session =>
                ctx.queryTimeout.foreach(session.queryTimeout(_))
                usql.apply[C]()
              }
            }
          }

        } else {
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
        }
      }
    }

如果batch-update是某种Insert操作的话我们可以通过cox.batch注明返回由JDBC系统自动产生的唯一键。这些主键一般在构建表时注明,包括:serial, auto_increment等。如果不返回主键则返回update语句的更新状态如更新数据条数等。在上面这个函数里SQLBatchWithGeneratedKey.apply()返回insert数据主键,所以statement必须是INSERT语句。SQLBatch.apply()则用来运算update语句并返回更新数据的条数。下面是jdbcBatchUpdate函数的使用示范:

 val insertSQL = "insert into members(name,birthday,description,created_at,picture) values (?, ?, ?, ?, ?)"
  val dateCreated = DateTime.now

  import java.io.FileInputStream

  val picfile = new File("/users/tiger/Nobody.png")
  val fis = new FileInputStream(picfile)

  ctx = JDBCContext('h2)
  try {
    ctx = ctx.setBatchCommand(insertSQL).appendBatchParameters(
      "John",new LocalDate("2008-03-01"),"youngest user",dateCreated,None).appendBatchParameters(
      "peter", None, "no birth date", dateCreated, fis)
      .appendBatchParameters(
        "susan", None, "no birth date", dateCreated, None)
      .setBatchReturnGeneratedKeyOption(JDBCContext.RETURN_GENERATED_KEYVALUE)
  }
  catch {
    case e: Exception => println(e.getMessage)
  }

  var resultInserts = jdbcBatchUpdate(ctx)

  resultInserts match {
    case Success(msg) => println(msg)
    case Failure(err) => println(s"${err.getMessage}")
  }

上面这个例子里一个transaction批次包含了三条Insert语句,其中一条涉及存入picture字段:我们只需要把图像文件InputStream作为普通参数传人即可。我们也可以把任何类型的非batch-update语句捆绑在统一的transaction里运算,而且可以指定每条update返回类型:自动产生的主键或者更新数据条数:

def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      if (ctx.statements == Nil)
        throw new IllegalStateException("JDBCContex setting error: statements empty!")
      if (ctx.sqlType != SQL_UPDATE) {
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
      }
      else {
        if (!ctx.batch) {
          if (ctx.statements.size == 1)
            singleTxUpdate(ctx)
          else
            multiTxUpdates(ctx)
        } else
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !"))

      }
    }

这个update函数又被细分为单条语句singleTxUpdate和多条语句multiTxUpdates。无论单条或多条update函数又被分为返回主键或更新状态类型的函数:

 private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
       implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
       val Some(key) :: xs = ctx.returnGeneratedKey
       val params: Seq[Any] = ctx.parameters match {
         case Nil => Nil
         case p@_ => p.head
       }
       val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
       Try {
         NamedDB(ctx.dbName) localTx { implicit session =>
           session.fetchSize(ctx.fetchSize)
           ctx.queryTimeout.foreach(session.queryTimeout(_))
           val result = usql.apply()
           Seq(result).to[C]
         }
       }
     }

      private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      val params: Seq[Any] = ctx.parameters match {
        case Nil => Nil
        case p@_ => p.head
      }
      val before = ctx.preAction match {
        case None => pstm: PreparedStatement => {}
        case Some(f) => f
      }
      val after = ctx.postAction match {
        case None => pstm: PreparedStatement => {}
        case Some(f) => f
      }
      val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
      Try {
        NamedDB(ctx.dbName) localTx {implicit session =>
          session.fetchSize(ctx.fetchSize)
          ctx.queryTimeout.foreach(session.queryTimeout(_))
          val result = usql.apply()
          Seq(result.toLong).to[C]
        }
      }

    }

    private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      if (noReturnKey(ctx))
        singleTxUpdateNoReturnKey(ctx)
      else
        singleTxUpdateWithReturnKey(ctx)
    }

    private def noReturnKey(ctx: JDBCContext): Boolean = {
      if (ctx.returnGeneratedKey != Nil) {
        val k :: xs = ctx.returnGeneratedKey
         k match {
          case None => true
          case Some(k) => false
        }
      } else true
    }

    def noActon: PreparedStatement=>Unit = pstm => {}

    def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
        Try {
          NamedDB(ctx.dbName) localTx { implicit session =>
            session.fetchSize(ctx.fetchSize)
            ctx.queryTimeout.foreach(session.queryTimeout(_))
            val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
              case Nil => Seq.fill(ctx.statements.size)(None)
              case k@_ => k
            }
            val sqlcmd = ctx.statements zip ctx.parameters zip keys
            val results = sqlcmd.map { case ((stm, param), key) =>
              key match {
                case None =>
                  new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
                case Some(k) =>
                  new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
              }
            }
            results.to[C]
          }
        }
     }

下面是这个函数的使用示范: 

 val updateSQL = "update members set description = ? where id < ?"
  ctx = JDBCContext('h2)
  try {
     ctx = ctx.setUpdateCommand(JDBCContext.RETURN_GENERATED_KEYVALUE,insertSQL,
       "max", None, "no birth date", dateCreated, None)
       .appendUpdateCommand(JDBCContext.RETURN_UPDATED_COUNT, updateSQL, "id++", 10)
      .appendUpdateCommand(JDBCContext.RETURN_UPDATED_COUNT,"delete members where id = 1")
  }
  catch {
    case e: Exception => println(e.getMessage)
  }
  var resultUpdates = jdbcTxUpdates[Vector](ctx)

  resultUpdates match {
    case Success(msg) => println(msg)
    case Failure(err) => println(s"${err.getMessage}")
  }

在这个例子里我们把insert,update和delete混在了一个transaction里。最后,我们再把试验数据,包括blob字段读出来:

  //data model
  case class Member(
                     id: Long,
                     name: String,
                     description: Option[String] = None,
                     birthday: Option[LocalDate] = None,
                     createdAt: DateTime,
                     picture: InputStream)

  //data row converter
  val toMember = (rs: WrappedResultSet) => Member(
    id = rs.long("id"),
    name = rs.string("name"),
    description = rs.stringOpt("description"),
    birthday = rs.jodaLocalDateOpt("birthday"),
    createdAt = rs.jodaDateTime("created_at"),
    picture = rs.binaryStream("picture")
  )

  ctx = JDBCContext('h2)
  ctx = ctx.setQueryCommand("select * from members").setQueryTimeout(Some(1000))

  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)

  val buffer = new Array[Byte](1024)

  vecMember.foreach {row =>
    println(s"id: ${row.id} name: ${row.name}")
    println(s"name: ${row.name}")
    if (row.picture == null)
      println("picture empty")
    else {
      val fname = s"/users/tiger/pic${row.id}.png"
      val file = new File(fname)
      val output = new FileOutputStream(file)

      println(s"saving picture to $fname")

      row.picture.available()
      while (row.picture.read(buffer) > 0) {
        output.write(buffer)
      }

      output.close()

    }
  }

下面是本次讨论的示范源代码:

build.sbt

name := "learn-scalikeJDBC"

version := "0.1"

scalaVersion := "2.12.4"

// Scala 2.10, 2.11, 2.12
libraryDependencies ++= Seq(
  "org.scalikejdbc" %% "scalikejdbc"       % "3.1.0",
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.1.0"   % "test",
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.1.0",
  "com.h2database"  %  "h2"                % "1.4.196",
  "mysql" % "mysql-connector-java" % "6.0.6",
  "org.postgresql" % "postgresql" % "42.2.0",
  "commons-dbcp" % "commons-dbcp" % "1.4",
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
  "com.zaxxer" % "HikariCP" % "2.7.4",
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
  "com.typesafe.slick" %% "slick" % "3.2.1",
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3"
)

resources/application.conf

# JDBC settings
test {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "commons-dbcp2"
    }
  }

  db.mysql.driver = "com.mysql.cj.jdbc.Driver"
  db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
  db.mysql.user = "root"
  db.mysql.password = "123"
  db.mysql.poolInitialSize = 5
  db.mysql.poolMaxSize = 7
  db.mysql.poolConnectionTimeoutMillis = 1000
  db.mysql.poolValidationQuery = "select 1 as one"
  db.mysql.poolFactoryName = "bonecp"

  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
dev {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
    mysql {
      driver = "com.mysql.cj.jdbc.Driver"
      url = "jdbc:mysql://localhost:3306/testdb"
      user = "root"
      password = "123"
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "bonecp"

    }
    postgres {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://localhost:5432/testdb"
      user = "root"
      password = "123"
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
  }
  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}

JDBCEngine.scala

package jdbccontext
import java.sql.PreparedStatement

import scala.collection.generic.CanBuildFrom
import scalikejdbc._

import scala.util._
import scalikejdbc.TxBoundary.Try._

  object JDBCContext {
    type SQLTYPE = Int
    val SQL_SELECT: Int = 0
    val SQL_EXEDDL= 1
    val SQL_UPDATE = 2
    val RETURN_GENERATED_KEYVALUE = true
    val RETURN_UPDATED_COUNT = false

  }

  case class JDBCContext(
                          dbName: Symbol,
                          statements: Seq[String] = Nil,
                          parameters: Seq[Seq[Any]] = Nil,
                          fetchSize: Int = 100,
                          queryTimeout: Option[Int] = None,
                          queryTags: Seq[String] = Nil,
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
                          batch: Boolean = false,
                          returnGeneratedKey: Seq[Option[Any]] = Nil,
                          // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None,
                          postAction: Option[PreparedStatement => Unit] = None) {

    ctx =>

    //helper functions

    def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)

    def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)

    def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)

    def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)

    def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
        !ctx.batch && ctx.statements.size == 1)
        ctx.copy(preAction = action)
      else
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
    }

    def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
        !ctx.batch && ctx.statements.size == 1)
        ctx.copy(postAction = action)
      else
        throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
    }

    def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
        ctx.copy(
          statements = ctx.statements ++ Seq(_statement),
          parameters = ctx.parameters ++ Seq(Seq(_parameters))
        )
      } else
        throw new IllegalStateException("JDBCContex setting error: option not supported!")
    }

    def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
      if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
        ctx.copy(
          statements = ctx.statements ++ Seq(_statement),
          parameters = ctx.parameters ++ Seq(_parameters),
          returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
        )
      } else
        throw new IllegalStateException("JDBCContex setting error: option not supported!")
    }

    def appendBatchParameters(_parameters: Any*): JDBCContext = {
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
        throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")

      var matchParams = true
      if (ctx.parameters != Nil)
        if (ctx.parameters.head.size != _parameters.size)
          matchParams = false
      if (matchParams) {
        ctx.copy(
          parameters = ctx.parameters ++ Seq(_parameters)
        )
      } else
        throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
    }

    def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
      if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
         throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
      ctx.copy(
        returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
      )
    }

     def setQueryCommand(_statement: String, _parameters: Any*): JDBCContext = {
        ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          sqlType = JDBCContext.SQL_SELECT,
          batch = false
        )
      }

      def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
        ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          sqlType = JDBCContext.SQL_EXEDDL,
          batch = false
        )
      }

      def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
        ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
          sqlType = JDBCContext.SQL_UPDATE,
          batch = false
        )
      }
      def setBatchCommand(_statement: String): JDBCContext = {
        ctx.copy (
          statements = Seq(_statement),
          sqlType = JDBCContext.SQL_UPDATE,
          batch = true
        )
      }
  }

  object JDBCEngine {

    import JDBCContext._

    private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
      throw new IllegalStateException(message)
    }

    def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
         ctx: JDBCContext, rowConverter: WrappedResultSet => A)(
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {

      ctx.sqlType match {
        case SQL_SELECT => {
          val params: Seq[Any] = ctx.parameters match {
            case Nil => Nil
            case p@_ => p.head
          }
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor(""))
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
          ctx.queryTags.foreach(rawSql.tags(_))
          rawSql.fetchSize(ctx.fetchSize)
          implicit val session = NamedAutoSession(ctx.dbName)
          val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)
          sql.collection.apply[C]()
        }
        case _ => throw new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_SELECT'!")
      }
    }

    def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = {
       if (ctx.sqlType != SQL_EXEDDL) {
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
      }
      else {
        NamedDB(ctx.dbName) localTx { implicit session =>
          Try {
                ctx.statements.foreach { stm =>
                  val ddl = new SQLExecution(statement = stm, parameters = Nil)(
                    before = WrappedResultSet => {})(
                    after = WrappedResultSet => {})

                  ddl.apply()
              }
            "SQL_EXEDDL executed succesfully."
          }
        }
      }
    }

    def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      if (ctx.statements == Nil)
        throw new IllegalStateException("JDBCContex setting error: statements empty!")
      if (ctx.sqlType != SQL_UPDATE) {
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
      }
      else {
        if (ctx.batch) {
          if (noReturnKey(ctx)) {
            val usql = SQL(ctx.statements.head)
              .tags(ctx.queryTags: _*)
              .batch(ctx.parameters: _*)
            Try {
              NamedDB(ctx.dbName) localTx { implicit session =>
                ctx.queryTimeout.foreach(session.queryTimeout(_))
                usql.apply[Seq]()
                Seq.empty[Long].to[C]
              }
            }
          } else {
            val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
            Try {
              NamedDB(ctx.dbName) localTx { implicit session =>
                ctx.queryTimeout.foreach(session.queryTimeout(_))
                usql.apply[C]()
              }
            }
          }

        } else {
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
        }
      }
    }
     private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
       implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
       val Some(key) :: xs = ctx.returnGeneratedKey
       val params: Seq[Any] = ctx.parameters match {
         case Nil => Nil
         case p@_ => p.head
       }
       val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
       Try {
         NamedDB(ctx.dbName) localTx { implicit session =>
           session.fetchSize(ctx.fetchSize)
           ctx.queryTimeout.foreach(session.queryTimeout(_))
           val result = usql.apply()
           Seq(result).to[C]
         }
       }
     }

      private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      val params: Seq[Any] = ctx.parameters match {
        case Nil => Nil
        case p@_ => p.head
      }
      val before = ctx.preAction match {
        case None => pstm: PreparedStatement => {}
        case Some(f) => f
      }
      val after = ctx.postAction match {
        case None => pstm: PreparedStatement => {}
        case Some(f) => f
      }
      val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
      Try {
        NamedDB(ctx.dbName) localTx {implicit session =>
          session.fetchSize(ctx.fetchSize)
          ctx.queryTimeout.foreach(session.queryTimeout(_))
          val result = usql.apply()
          Seq(result.toLong).to[C]
        }
      }

    }

    private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      if (noReturnKey(ctx))
        singleTxUpdateNoReturnKey(ctx)
      else
        singleTxUpdateWithReturnKey(ctx)
    }

    private def noReturnKey(ctx: JDBCContext): Boolean = {
      if (ctx.returnGeneratedKey != Nil) {
        val k :: xs = ctx.returnGeneratedKey
         k match {
          case None => true
          case Some(k) => false
        }
      } else true
    }

    def noActon: PreparedStatement=>Unit = pstm => {}

    def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
        Try {
          NamedDB(ctx.dbName) localTx { implicit session =>
            session.fetchSize(ctx.fetchSize)
            ctx.queryTimeout.foreach(session.queryTimeout(_))
            val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
              case Nil => Seq.fill(ctx.statements.size)(None)
              case k@_ => k
            }
            val sqlcmd = ctx.statements zip ctx.parameters zip keys
            val results = sqlcmd.map { case ((stm, param), key) =>
              key match {
                case None =>
                  new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
                case Some(k) =>
                  new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
              }
            }
            results.to[C]
          }
        }
     }


    def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
      implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
      if (ctx.statements == Nil)
        throw new IllegalStateException("JDBCContex setting error: statements empty!")
      if (ctx.sqlType != SQL_UPDATE) {
        Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
      }
      else {
        if (!ctx.batch) {
          if (ctx.statements.size == 1)
            singleTxUpdate(ctx)
          else
            multiTxUpdates(ctx)
        } else
          Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !"))

      }
    }

  }

JDBCEngineDemo.scala

import java.io.File
import java.io.FileOutputStream
import java.io.InputStream
import jdbccontext._
import configdbs._
import org.joda.time._
import scala.util._
import JDBCEngine._

import scalikejdbc._
object CrudDemo extends App {
  ConfigDBsWithEnv("dev").setup('h2)
  ConfigDBsWithEnv("dev").loadGlobalSettings()

  val dropSQL: String ="""
      drop table members
    """

  val createSQL: String ="""
    create table members (
      id serial not null primary key,
      name varchar(30) not null,
      description varchar(1000),
      birthday date,
      created_at timestamp not null,
      picture blob
    )"""

  var ctx = JDBCContext('h2)
    try {
      ctx = ctx.setDDLCommand(dropSQL)
        .appendDDLCommand(createSQL)
    }
    catch {
       case e: Exception => println(e.getMessage)
    }

  val resultCreateTable = jdbcExcuteDDL(ctx)

  resultCreateTable match {
    case Success(msg) => println(msg)
    case Failure(err) => println(s"${err.getMessage}")
  }

  val insertSQL = "insert into members(name,birthday,description,created_at,picture) values (?, ?, ?, ?, ?)"
  val dateCreated = DateTime.now

  import java.io.FileInputStream

  val picfile = new File("/users/tiger/Nobody.png")
  val fis = new FileInputStream(picfile)

  ctx = JDBCContext('h2)
  try {
    ctx = ctx.setBatchCommand(insertSQL).appendBatchParameters(
      "John",new LocalDate("2008-03-01"),"youngest user",dateCreated,None).appendBatchParameters(
      "peter", None, "no birth date", dateCreated, fis)
      .appendBatchParameters(
        "susan", None, "no birth date", dateCreated, None)
      .setBatchReturnGeneratedKeyOption(JDBCContext.RETURN_GENERATED_KEYVALUE)
  }
  catch {
    case e: Exception => println(e.getMessage)
  }

  var resultInserts = jdbcBatchUpdate(ctx)

  resultInserts match {
    case Success(msg) => println(msg)
    case Failure(err) => println(s"${err.getMessage}")
  }


  val updateSQL = "update members set description = ? where id < ?"
  ctx = JDBCContext('h2)
  try {
     ctx = ctx.setUpdateCommand(JDBCContext.RETURN_GENERATED_KEYVALUE,insertSQL,
       "max", None, "no birth date", dateCreated, None)
       .appendUpdateCommand(JDBCContext.RETURN_UPDATED_COUNT, updateSQL, "id++", 10)
      .appendUpdateCommand(JDBCContext.RETURN_UPDATED_COUNT,"delete members where id = 1")
  }
  catch {
    case e: Exception => println(e.getMessage)
  }
  var resultUpdates = jdbcTxUpdates[Vector](ctx)

  resultUpdates match {
    case Success(msg) => println(msg)
    case Failure(err) => println(s"${err.getMessage}")
  }


  //data model
  case class Member(
                     id: Long,
                     name: String,
                     description: Option[String] = None,
                     birthday: Option[LocalDate] = None,
                     createdAt: DateTime,
                     picture: InputStream)

  //data row converter
  val toMember = (rs: WrappedResultSet) => Member(
    id = rs.long("id"),
    name = rs.string("name"),
    description = rs.stringOpt("description"),
    birthday = rs.jodaLocalDateOpt("birthday"),
    createdAt = rs.jodaDateTime("created_at"),
    picture = rs.binaryStream("picture")
  )

  ctx = JDBCContext('h2)
  ctx = ctx.setQueryCommand("select * from members").setQueryTimeout(Some(1000))

  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)

  val buffer = new Array[Byte](1024)

  vecMember.foreach {row =>
    println(s"id: ${row.id} name: ${row.name}")
    println(s"name: ${row.name}")
    if (row.picture == null)
      println("picture empty")
    else {
      val fname = s"/users/tiger/pic${row.id}.png"
      val file = new File(fname)
      val output = new FileOutputStream(file)

      println(s"saving picture to $fname")

      row.picture.available()
      while (row.picture.read(buffer) > 0) {
        output.write(buffer)
      }

      output.close()

    }
  }

}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-02-08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档