前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SDP(3):ScalikeJDBC- JDBC-Engine:Fetching

SDP(3):ScalikeJDBC- JDBC-Engine:Fetching

作者头像
用户1150956
发布2018-03-16 16:38:56
1.7K0
发布2018-03-16 16:38:56
举报

  ScalikeJDBC在覆盖JDBC基本功能上是比较完整的,而且实现这些功能的方式比较简洁,运算效率方面自然会稍高一筹了。理论上用ScalikeJDBC作为一种JDBC-Engine还是比较理想的:让它处于各种JDBC工具库和数据库实例之间接收JDBC运算指令然后连接目标数据库进行相关运算后返回结果。一般来说,各种JDBC工具库如ORM,FRM软件通过各自的DSL在复杂的数据库表关系环境内进行数据管理编程,最终产生相关的SQL语句即(prepared)statement+parameters传递给指定类型的数据库JDBC驱动程序去运算并产生结果。如果这样描述,那么JDBC-Engine主要的功能就是支持下面这个函数:

代码语言:javascript
复制
jdbcRunSQL(context: JDBCContext): JDBCResultSet

这个函数的用户提供一个JDBCContext类型值,然后由jdbcRunSQL进行接下来的运算并返回结果。从这个角度分析,JDBCContext最起码需要提供下面的属性: 

1、数据库连接:选择数据库连接池

2、运算参数:fetchSize, queryTimeout,queryTag。这几个参数都针对当前运算的SQL

3、Query参数:

    Query类型:select/execute/update、单条/成批、前置/后置query、generateKey

    SQL语句:statement:Seq[String]、parameters: Seq[Option[Seq[Any]]]

下面就是JDBCContext类型定义:

代码语言:javascript
复制
import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import scalikejdbc._

  object JDBCContext {
    type SQLTYPE = Int
    val SQL_SELECT: Int = 0
    val SQL_EXECUTE = 1
    val SQL_UPDATE = 2

    def returnColumnByIndex(idx: Int) = Some(idx)

    def returnColumnByName(col: String) = Some(col)
  }

  case class JDBCContext(
                          dbName: Symbol,
                          statements: Seq[String],
                          parameters: Seq[Seq[Any]] = Nil,
                          fetchSize: Int = 100,
                          queryTimeout: Option[Int] = None,
                          queryTags: Seq[String] = Nil,
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
                          batch: Boolean = false,
                          returnGeneratedKey: Option[Any] = None,
                          // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None,
                          postAction: Option[PreparedStatement => Unit] = None)
代码语言:javascript
复制
重新考虑了一下,觉着把jdbc读写分开两个函数来实现更容易使用,因为这样比较符合编程模式和习性。所以最好把sqlType=SQL_SELECT类型SQL独立一个函数出来运算:
代码语言:javascript
复制
   def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
         ctx: JDBCContext, rowConverter: WrappedResultSet => A)(
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
      ctx.sqlType match {
        case SQL_SELECT => {
          val params: Seq[Any] = ctx.parameters match {
            case Nil => Nil
            case p@_ => p.head
          }
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor(""))
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
          ctx.queryTags.foreach(rawSql.tags(_))
          rawSql.fetchSize(ctx.fetchSize)
          implicit val session = NamedAutoSession(ctx.dbName)
          val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)
          sql.collection.apply[C]()
        }
        case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!")
      }
    }

还需要提供noExtractor函数来符合SQLToCollectionImpl类型的参数款式要求:

代码语言:javascript
复制
  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
      throw new IllegalStateException(message)
    }
代码语言:javascript
复制
我们来测试用一下jdbcQueryResult:
代码语言:javascript
复制
import scalikejdbc._
import JDBCEngine._
import configdbs._
import org.joda.time._
object JDBCQueryDemo extends App {
  ConfigDBsWithEnv("dev").setupAll()

  val ctx = JDBCContext(
    dbName = 'h2,
    statements = Seq("select * from members where id = ?"),
    parameters = Seq(Seq(2))
  )

  //data model
  case class Member(
                     id: Long,
                     name: String,
                     description: Option[String] = None,
                     birthday: Option[LocalDate] = None,
                     createdAt: DateTime)

  //data row converter
  val toMember = (rs: WrappedResultSet) => Member(
    id = rs.long("id"),
    name = rs.string("name"),
    description = rs.stringOpt("description"),
    birthday = rs.jodaLocalDateOpt("birthday"),
    createdAt = rs.jodaDateTime("created_at")
  )

  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)

  println(s"members in vector: $vecMember")

  val ctx1 = ctx.copy(dbName = 'mysql)

  val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")})

  println(s"selected name: $names")

  val ctx2 = ctx1.copy(dbName = 'postgres)
  val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))})

  println(s"selected id+name: $idname")
}

如果我们使用Slick-DSL进行数据库管理编程后应该如何与JDBC-Engine对接:

代码语言:javascript
复制
 object SlickDAO {
    import slick.jdbc.H2Profile.api._

    case class CountyModel(id: Int, name: String)
    case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
      def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
      def name = column[String]("NAME",O.Length(64))
      def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
    }
    val CountyQuery = TableQuery[CountyTable]
    val filter = "Kansas"
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val statement = qry.result.statements.head
  }
  import SlickDAO._


  val slickCtx = JDBCContext(
    dbName = 'h2,
    statements = Seq(statement),
  )

  val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{
    rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))})
  vecCounty.foreach(r => println(s"${r.id},${r.name}"))

输出正确。

下面就是本次示范的源代码:

 build.sbt

代码语言:javascript
复制
name := "learn-scalikeJDBC"

version := "0.1"

scalaVersion := "2.12.4"

// Scala 2.10, 2.11, 2.12
libraryDependencies ++= Seq(
  "org.scalikejdbc" %% "scalikejdbc"       % "3.1.0",
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.1.0"   % "test",
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.1.0",
  "com.h2database"  %  "h2"                % "1.4.196",
  "mysql" % "mysql-connector-java" % "6.0.6",
  "org.postgresql" % "postgresql" % "42.2.0",
  "commons-dbcp" % "commons-dbcp" % "1.4",
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
  "com.zaxxer" % "HikariCP" % "2.7.4",
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
  "com.typesafe.slick" %% "slick" % "3.2.1",
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3"
)

resources/application.conf 包括H2,MySQL,PostgreSQL

代码语言:javascript
复制
# JDBC settings
test {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "commons-dbcp2"
    }
  }

  db.mysql.driver = "com.mysql.cj.jdbc.Driver"
  db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
  db.mysql.user = "root"
  db.mysql.password = "123"
  db.mysql.poolInitialSize = 5
  db.mysql.poolMaxSize = 7
  db.mysql.poolConnectionTimeoutMillis = 1000
  db.mysql.poolValidationQuery = "select 1 as one"
  db.mysql.poolFactoryName = "bonecp"

  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
dev {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
    mysql {
      driver = "com.mysql.cj.jdbc.Driver"
      url = "jdbc:mysql://localhost:3306/testdb"
      user = "root"
      password = "123"
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "bonecp"

    }
    postgres {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://localhost:5432/testdb"
      user = "root"
      password = "123"
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    }
  }
  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10

HikariConfig.scala  HikariCP连接池实现

代码语言:javascript
复制
package configdbs
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import com.typesafe.config._
import java.util.concurrent.TimeUnit
import java.util.Properties
import scalikejdbc.config._
import com.typesafe.config.Config
import com.zaxxer.hikari._
import scalikejdbc.ConnectionPoolFactoryRepository

/** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal {
  import scala.collection.JavaConverters._

  def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
  def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
  def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
  def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default

  def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
  def getDurationOr(path: String, default: => Duration = Duration.Zero) =
    if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default

  def getPropertiesOr(path: String, default: => Properties = null): Properties =
    if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default

  def toProperties: Properties = {
    def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
      val props = new Properties(null)
      m.foreach { case (k, cv) =>
        val v =
          if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
          else if(cv.unwrapped eq null) null
          else cv.unwrapped.toString
        if(v ne null) props.put(k, v)
      }
      props
    }
    toProps(c.root.asScala)
  }

  def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
  def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
  def getStringOpt(path: String) = Option(getStringOr(path))
  def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
}

object ConfigExtensionMethods {
  @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
}

trait HikariConfigReader extends TypesafeConfigReader {
  self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix =>

  import ConfigExtensionMethods.configExtensionMethods

  def getFactoryName(dbName: Symbol): String = {
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
    c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
  }

  def hikariCPConfig(dbName: Symbol): HikariConfig = {

    val hconf = new HikariConfig()
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)

    // Connection settings
    if (c.hasPath("dataSourceClass")) {
      hconf.setDataSourceClassName(c.getString("dataSourceClass"))
    } else {
      Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
    }
    hconf.setJdbcUrl(c.getStringOr("url", null))
    c.getStringOpt("user").foreach(hconf.setUsername)
    c.getStringOpt("password").foreach(hconf.setPassword)
    c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)

    // Pool configuration
    hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))
    hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))
    hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))
    hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))
    hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))
    hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
    c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
    c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
    val numThreads = c.getIntOr("numThreads", 20)
    hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))
    hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
    hconf.setPoolName(c.getStringOr("poolName", dbName.name))
    hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))

    // Equivalent of ConnectionPreparer
    hconf.setReadOnly(c.getBooleanOr("readOnly", false))
    c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
    hconf.setCatalog(c.getStringOr("catalog", null))

    hconf

  }
}

import scalikejdbc._
trait ConfigDBs {
  self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>

  def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
    getFactoryName(dbName) match {
      case "hikaricp" => {
        val hconf = hikariCPConfig(dbName)
        val hikariCPSource = new HikariDataSource(hconf)
        if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
          Class.forName(hconf.getDriverClassName)
        }
        ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource))
      }
      case _ => {
        val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
        val cpSettings = readConnectionPoolSettings(dbName)
        if (driver != null && driver.trim.nonEmpty) {
          Class.forName(driver)
        }
        ConnectionPool.add(dbName, url, user, password, cpSettings)
      }
    }
  }

  def setupAll(): Unit = {
    loadGlobalSettings()
    dbNames.foreach { dbName => setup(Symbol(dbName)) }
  }

  def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
    ConnectionPool.close(dbName)
  }

  def closeAll(): Unit = {
    ConnectionPool.closeAll
  }

}


object ConfigDBs extends ConfigDBs
  with TypesafeConfigReader
  with StandardTypesafeConfig
  with HikariConfigReader

case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
  with TypesafeConfigReader
  with StandardTypesafeConfig
  with HikariConfigReader
  with EnvPrefix {

  override val env = Option(envValue)
}

JDBCEngine.scala jdbcQueryResult函数实现

代码语言:javascript
复制
import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import scalikejdbc._

  object JDBCContext {
    type SQLTYPE = Int
    val SQL_SELECT: Int = 0
    val SQL_EXECUTE = 1
    val SQL_UPDATE = 2

    def returnColumnByIndex(idx: Int) = Some(idx)

    def returnColumnByName(col: String) = Some(col)
  }

  case class JDBCContext(
                          dbName: Symbol,
                          statements: Seq[String],
                          parameters: Seq[Seq[Any]] = Nil,
                          fetchSize: Int = 100,
                          queryTimeout: Option[Int] = None,
                          queryTags: Seq[String] = Nil,
                          sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
                          batch: Boolean = false,
                          returnGeneratedKey: Option[Any] = None,
                          // no return: None, return by index: Some(1), by name: Some("id")
                          preAction: Option[PreparedStatement => Unit] = None,
                          postAction: Option[PreparedStatement => Unit] = None)

  object JDBCEngine {

    import JDBCContext._

    private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
      throw new IllegalStateException(message)
    }

    def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
         ctx: JDBCContext, rowConverter: WrappedResultSet => A)(
          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
      ctx.sqlType match {
        case SQL_SELECT => {
          val params: Seq[Any] = ctx.parameters match {
            case Nil => Nil
            case p@_ => p.head
          }
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor("boom!"))
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
          ctx.queryTags.foreach(rawSql.tags(_))
          rawSql.fetchSize(ctx.fetchSize)
          implicit val session = NamedAutoSession(ctx.dbName)
          val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)
          sql.collection.apply[C]()
        }
        case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!")
      }
    }

  }

JDBCQueryDemo.scala  功能测试代码

代码语言:javascript
复制
import scalikejdbc._
import JDBCEngine._
import configdbs._
import org.joda.time._
object JDBCQueryDemo extends App {
  ConfigDBsWithEnv("dev").setupAll()

  val ctx = JDBCContext(
    dbName = 'h2,
    statements = Seq("select * from members where id = ?"),
    parameters = Seq(Seq(2))
  )

  //data model
  case class Member(
                     id: Long,
                     name: String,
                     description: Option[String] = None,
                     birthday: Option[LocalDate] = None,
                     createdAt: DateTime)

  //data row converter
  val toMember = (rs: WrappedResultSet) => Member(
    id = rs.long("id"),
    name = rs.string("name"),
    description = rs.stringOpt("description"),
    birthday = rs.jodaLocalDateOpt("birthday"),
    createdAt = rs.jodaDateTime("created_at")
  )

  val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)

  println(s"members in vector: $vecMember")

  val ctx1 = ctx.copy(dbName = 'mysql)

  val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")})

  println(s"selected name: $names")

  val ctx2 = ctx1.copy(dbName = 'postgres)
  val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))})

  println(s"selected id+name: $idname")


  object SlickDAO {
    import slick.jdbc.H2Profile.api._

    case class CountyModel(id: Int, name: String)
    case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
      def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
      def name = column[String]("NAME",O.Length(64))
      def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
    }
    val CountyQuery = TableQuery[CountyTable]
    val filter = "Kansas"
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val statement = qry.result.statements.head
  }
  import SlickDAO._


  val slickCtx = JDBCContext(
    dbName = 'h2,
    statements = Seq(statement),
  )

  val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{
    rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))})
  vecCounty.foreach(r => println(s"${r.id},${r.name}"))


}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-01-29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档