专栏首页函数式编程语言及工具restapi(2)- generic restful CRUD:通用的restful风格数据库表维护工具

restapi(2)- generic restful CRUD:通用的restful风格数据库表维护工具

研究关于restapi的初衷是想搞一套通用的平台数据表维护http工具。前面谈过身份验证和使用权限、文件的上传下载,这次来到具体的数据库表维护。我们在这篇示范里设计一套通用的对平台每一个数据表的标准维护方式。http服务端数据表维护CRUD有几个标准的部分组成:Model,Repository,Route。我们先看看这几个类型的基类:

trait ModelBase[M,E] {
  def to: M => E
  def from: E => M
}


trait RepoBase[M] {
  def getById(id: Long) : Future[Option[M]]
  def getAll : Future[Seq[M]]
  def filter(expr: M => Boolean): Future[Seq[M]]
  def save(row: M) : Future[AnyRef]
  def deleteById(id: Long) : Future[Int]
  def updateById(id: Long, row: M) : Future[Int]
}


abstract class RouteBase[M](val pathName: String, repository: RepoBase[M])(
  implicit m: Manifest[M]) extends Directives with JsonConverter {

  val route = path(pathName) {
    get {
      complete(futureToJson(repository.getAll))
    } ~ post {
      entity(as[String]) { json =>
        val extractedEntity = fromJson[M](json)
        complete(futureToJson(repository.save(extractedEntity)))
      }
    }
  } ~ path(pathName / LongNumber) { id =>
    get {
      complete(futureToJson(repository.getById(id)))
    } ~ put {
      entity(as[String]) { json =>
        val extractedEntity = fromJson[M](json)
        complete(futureToJsonAny(repository.updateById(id, extractedEntity)))
      }
    } ~ delete {
      complete(futureToJsonAny(repository.deleteById(id)))
    }
  }
}

很明显,Model是数据库表行类型的表达方式、Repository是数据库表操作方法、Route是操作方法的调用。下面是这几个类型的实例示范:

object MockModels {
  case class DataRow (
                     name: String,
                     age: Int
                     )
  case class Person(name: String, age: Int)
       extends ModelBase[Person,DataRow] {
    def to: Person => DataRow = p => DataRow (
      name = p.name,
      age = p.age
    )
    def from: DataRow => Person = m => Person(
      name = m.name,
      age = m.age
    )
  }
}


package com.datatech.restapi
import MockModels._

import scala.concurrent.Future
object MockRepo {
   class PersonRepo extends RepoBase[Person] {
    override def getById(id: Long): Future[Option[Person]] = Future.successful(Some(Person("johnny lee",23)))

    override def getAll: Future[Seq[Person]] = Future.successful(
      Seq(Person("jonny lee",23),Person("candy wang",45),Person("jimmy kowk",34))
    )

    override def filter(expr: Person => Boolean): Future[Seq[Person]] = Future.successful(
      Seq(Person("jonny lee",23),Person("candy wang",45),Person("jimmy kowk",34))
    )

    override def save(row: Person): Future[Person] = Future.successful(row)

    override def deleteById(id: Long): Future[Int] = Future.successful(1)

    override def updateById(id: Long, row: Person): Future[Int] = Future.successful(1)
  }

}


object PersonRoute {

  class PersonRoute(pathName: String, repo: RepoBase[Person])
     extends RouteBase[Person](pathName,repo)

  val route = new PersonRoute("person",new PersonRepo).route
}

Model代表数据表结构以及某种数据库的表行与Model之间的转换。而repository则代表某种数据库对库表具体操作的实现。我们把焦点拉回到RouteBase上来,这里包含了rest标准的get,post,put,delete http操作。实际上就是request/response处理机制。因为数据需要在线上on-the-wire来回移动,所以需要进行数据转换。通用的数据传输模式是:类->json->类,即序列化/反序列化。akka-http提供了丰富的Marshaller来实现自动的数据转换,但在编译时要提供Marshaller的隐式实例implicit instance,所以用类参数是无法通过编译的。只能手工进行类和json之间的转换。json转换是通过json4s实现的:

import java.text.SimpleDateFormat
import akka.http.scaladsl.model._
import org.json4s.JsonAST.{JNull, JString}
import org.json4s.{CustomSerializer, DefaultFormats, Formats}
import org.json4s.jackson.Serialization

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

trait DateSerializer {
  case object SqlDateSerializer extends CustomSerializer[java.sql.Date](format => ( {
    case JString(date) => {
      val utilDate = new SimpleDateFormat("yyyy-MM-dd").parse(date);
      new java.sql.Date(utilDate.getTime)
    }
    case JNull         => null
  }, {
    case date: java.sql.Date => JString(date.toString)
  }))

}

trait JsonConverter extends DateSerializer {
  implicit val formats: Formats = new DefaultFormats {
    override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd")
  } ++ List(SqlDateSerializer)

  def toJson(obj: AnyRef): String = {
    Serialization.write(obj)
  }

  def futureToJson(obj: Future[AnyRef]): Future[HttpResponse] = {
    obj.map { x =>
      HttpResponse(status = StatusCodes.OK, entity = HttpEntity(MediaTypes.`application/json`, Serialization.write(x)))
    }.recover {
      case ex => ex.printStackTrace(); HttpResponse(status = StatusCodes.InternalServerError)
    }

  }

  def futureToJsonAny(obj: Future[Any]): Future[HttpResponse] = {
    obj.map { x =>
      HttpResponse(status = StatusCodes.OK, entity = HttpEntity(MediaTypes.`application/json`, s"""{status : ${x}"""))
    }.recover {
      case ex => HttpResponse(status = StatusCodes.InternalServerError)
    }

  }

  def fromJson[E](json: String)(implicit m: Manifest[E]): E = {
    Serialization.read[E](json)
  }
}

当然对于一些特别的数据库表,我们还是希望使用akka-http强大的功能,如streaming。这时对于每一个这样的表单就需要要定制Route了。下面是一个定制Route的例子:

object MockModel {
  case class AddressRow (
                       province: String,
                       city: String,
                       street: String,
                       zip: String
                     )
  case class Address(
                      province: String,
                      city: String,
                      street: String,
                      zip: String
                    )
    extends ModelBase[Address,AddressRow] {
    def to: Address => AddressRow = addr => AddressRow (
      province = addr.province,
      city = addr.city,
      street = addr.street,
      zip = addr.zip
    )
    def from: AddressRow => Address = row => Address(
      province = row.province,
      city = row.city,
      street = row.street,
      zip = row.zip
    )
  }
}

  object AddressRepo {
     def getById(id: Long): Future[Option[Address]] = ???

     def getAll: Source[Address,_] = ???

     def filter(expr: Address => Boolean): Future[Seq[Address]] = ???

     def saveAll(rows: Source[Address,_]): Future[Int] = ???
     def saveAll(rows: Future[Seq[Address]]): Future[Int] = ???

     def deleteById(id: Long): Future[Address] = ???

     def updateById(id: Long, row: Address): Future[Address] = ???
  }


package com.datatech.restapi
import akka.actor._
import akka.stream._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.server._
import MockModels.Address
import MockRepo._


trait FormatConverter extends SprayJsonSupport with DefaultJsonProtocol{
  implicit val addrFormat = jsonFormat4(Address.apply)
}

case class AddressRoute(val pathName: String)(implicit akkaSys: ActorSystem) extends Directives with FormatConverter{
  implicit val mat = ActorMaterializer()
  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 2, unordered = false)

  val route = path(pathName) {
    get {
      complete(AddressRepo.getAll)
    } ~ post {
      withoutSizeLimit {
          entity(asSourceOf[Address]) { source =>
 /*           val futSavedRows: Future[Seq[Address]] =
              source.runFold(Seq[Address]())((acc, addr) => acc :+ addr)
            onComplete(futSavedRows) { rows =>  */
            onComplete(AddressRepo.saveAll(source)) {rows =>
            complete { s"$rows address saved."}
          }
        }
      }

  } ~ path(pathName / LongNumber) { id =>
    get {
      complete(AddressRepo.getById(id)))
    } ~ put {
      entity(as[Address]) { addr =>
        onComplete(AddressRepo.updateById(id,addr)) { addr =>
        complete(s"address updated to: $addr")
      }
    } ~ delete {
        onComplete(AddressRepo.deleteById(id)) { addr =>
          complete(s"address deleted: $addr")
    }
  }
}

这样做可以灵活的使用akka-stream提供的功能。

上面的例子Mock PersonRoute.route可以直接贴在主route后面:

  val route =
     path("auth") {
        authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
          post { complete(authenticator.issueJwt(userinfo))}
        }
     } ~
       pathPrefix("openspace") {
         (path("hello") & get) {
           complete(s"Hello, you are in open space.")
         }
       } ~
       pathPrefix("api") {
          authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
            (path("hello") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
            (path("how are you") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
            PersonRoute.route
            // ~ ...
          }
     }

和前面的示范一样,我们还是写一个客户端来测试:

import akka.actor._
import akka.http.scaladsl.model.headers._
import scala.concurrent._
import scala.concurrent.duration._
import akka.http.scaladsl.Http
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer

trait JsonFormats extends SprayJsonSupport with DefaultJsonProtocol
object JsonConverters extends JsonFormats {
  case class Person(name: String,age: Int)
  implicit val fmtPerson = jsonFormat2(Person)
}

object TestCrudClient  {
  type UserInfo = Map[String,Any]
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/")

    val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
    val authRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/auth",
      headers = List(authorization)
    )


    val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)

    val respToken = for {
      resp <- futToken
      jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
    } yield jstr

    val jstr =  Await.result[String](respToken,2 seconds)
    println(jstr)

    scala.io.StdIn.readLine()

    val authentication = headers.Authorization(OAuth2BearerToken(jstr))

    val getAllRequest = HttpRequest(
      HttpMethods.GET,
      uri = "http://192.168.11.189:50081/api/crud/person",
    ).addHeader(authentication)
    val futGet: Future[HttpResponse] = Http().singleRequest(getAllRequest)
    println(Await.result(futGet,2 seconds))
    scala.io.StdIn.readLine()

    import JsonConverters._

    val saveRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/api/crud/person"
    ).addHeader(authentication)
    val futPost: Future[HttpResponse] =
      for {
        reqEntity <- Marshal(Person("tiger chan",18)).to[RequestEntity]
        response <- Http().singleRequest(saveRequest.copy(entity=reqEntity))
      } yield response

    println(Await.result(futPost,2 seconds))
    scala.io.StdIn.readLine()
    system.terminate()
  }

}

下面是restapi发展到现在状态的源代码:

build.sbt

name := "restapi"

version := "0.3"

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http"   % "10.1.8",
  "com.typesafe.akka" %% "akka-stream" % "2.5.23",
  "com.pauldijou" %% "jwt-core" % "3.0.1",
  "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
  "org.json4s" %% "json4s-native" % "3.6.1",
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
  "org.slf4j" % "slf4j-simple" % "1.7.25",
  "org.json4s" %% "json4s-jackson" % "3.6.7",
  "org.json4s" %% "json4s-ext" % "3.6.7"
)

RestApiServer.scala

package com.datatech.restapi

import akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import pdi.jwt._
import AuthBase._
import MockUserAuthService._

object RestApiServer extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher



  implicit val authenticator = new AuthBase()
    .withAlgorithm(JwtAlgorithm.HS256)
    .withSecretKey("OpenSesame")
    .withUserFunc(getValidUser)

  val route =
    path("auth") {
      authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
        post { complete(authenticator.issueJwt(userinfo))}
      }
    } ~
      pathPrefix("api") {
        authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
            FileRoute(validToken)
              .route ~
            (pathPrefix("crud")) {
              PersonRoute.route
            }
          // ~ ...
        } ~
          (pathPrefix("crud")) {
            PersonRoute.route
            // ~ ...
          }
      }

  val (port, host) = (50081,"192.168.11.189")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()


  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())


}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • restapi(1)- 文件上传下载服务

    上次对restapi开了个头,设计了一个包括了身份验证和使用权限的restful服务开发框架。这是一个通用框架,开发人员只要直接往里面加新功能就行了。虽然这次...

    用户1150956
  • Akka(33): Http:Marshalling,to Json

      Akka-http是一项系统集成工具。这主要依赖系统之间的数据交换功能。因为程序内数据表达形式与网上传输的数据格式是不相同的,所以需要对程序高级结构化的数据...

    用户1150956
  • PICE(2):JDBCStreaming - gRPC-JDBC Service

       在一个akka-cluster环境里,从数据调用的角度上,JDBC数据库与集群中其它节点是脱离的。这是因为JDBC数据库不是分布式的,不具备节点位置透明化...

    用户1150956
  • restapi(1)- 文件上传下载服务

    上次对restapi开了个头,设计了一个包括了身份验证和使用权限的restful服务开发框架。这是一个通用框架,开发人员只要直接往里面加新功能就行了。虽然这次...

    用户1150956
  • Spring Boot实现文件上传

    Spring Boot默认支持文件上传,enabled这个可以不用配置,默认支持将文件写入磁盘,默认最大文件大小是1MB,默认最大请求大小是10MB,后面两个参...

    itlemon
  • restapi(0)- 平台数据维护,写在前面

    在云计算的推动下,软件系统发展趋于平台化。云平台系统一般都是分布式的集群系统,采用大数据技术。在这方面akka提供了比较完整的开发技术支持。我在上一个系列...

    用户1150956
  • SpringBoot集成RocketMQ,rocketmq_client.log日志文件配置

    项目启动时会在{user.home}/logs目录下创建一个rocketmq_client.log日志文件,文件全路径是:{user.home}/logs/ro...

    java乐园
  • 速读原著-Android应用开发入门教程(HelloActivity程序的运行)

    在软件开发的最初阶段,通常使用一个 Hello World 程序作为最简单的示例,本部分介绍一个 Android 中最简单应用程序,通过这部分内容可以了解到 A...

    cwl_java
  • TRTC TRTC Vue Demo + 后端密钥鉴权

    不禁感叹,Web的技术栈可真多啊,Vue是现行最流行的前端框架之一,相较于它的竞争对手React具有更加容易上手的特点。这里先发布一版Vue版本的Demo,因为...

    楚歌
  • flink异步io 转

    讨论主题:http:  //apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Pr...

    stys35

扫码关注云+社区

领取腾讯云代金券