专栏首页函数式编程语言及工具restapi(0)- 平台数据维护,写在前面

restapi(0)- 平台数据维护,写在前面

在云计算的推动下,软件系统发展趋于平台化。云平台系统一般都是分布式的集群系统,采用大数据技术。在这方面akka提供了比较完整的开发技术支持。我在上一个系列有关CQRS的博客中按照实际应用的要求对akka的一些开发技术进行了介绍。CQRS模式着重操作流程控制,主要涉及交易数据的管理。那么,作为交易数据产生过程中发挥验证作用的一系列基础数据如用户信息、商品信息、支付类型信息等又应该怎样维护呢?首先基础数据也应该是在平台水平上的,但数据的采集、维护是在系统前端的,比如一些web界面。所以平台基础数据维护系统是一套前后台结合的系统。对于一个开放的平台系统来说,应该能够适应各式各样的前端系统。一般来讲,平台通过定义一套api与前端系统集成是通用的方法。这套api必须遵循行业标准,技术要普及通用,这样才能支持各种异类前端系统功能开发。在这些要求背景下,相对gRPC, GraphQL来说,REST风格的http集成模式能得到更多开发人员的接受。

在有关CQRS系列博客里,我以akka-http作为系统集成工具的一种,零星地针对实际需要对http通信进行了介绍。在restapi这个系列里我想系统化的用akka-http构建一套完整的,REST风格数据维护和数据交换api,除CRUD之外还包括网络安全,文件交换等功能。我的计划是用akka-http搭建一个平台数据维护api的REST-CRUD框架,包含所有标配功能如用户验证、异常处理等。CRUD部分要尽量做成通用的generic,框架型的,能用一套标准的方法对任何数据表进行操作。

akka-http是一套http程序开发工具。它的Routing-DSL及数据序列化marshalling等都功能强大。特别是HttpResponse处理,一句complete解决了一大堆问题,magnet-pattern结合marshalling让它的使用更加方便。

在这篇讨论里先搭一个restapi的基本框架,包括客户端身份验证和使用权限。主要是示范如何达到通用框架的目的。这个在akka-http编程里主要体现在Routing-DSL的结构上,要求Route能够简洁易懂,如下:

  val route =
     path("auth") {
        authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
          post { complete(authenticator.issueJwt(userinfo))}
        }
     } ~
       pathPrefix("api") {
          authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
            (path("hello") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
            (path("how are you") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            }
            // ~ ...
          }
     }

我觉着这应该是框架型正确的方向:把所有功能都放在api下,统统经过权限验证。可以直接在后面不断加功能Route。

身份验证和使用权限也应该是一套标准的东西,但身份验证方法可能有所不同,特别是用户身份验证可能是通过独立的身份验证服务器实现的,对不同的验证机制应该有针对性的定制函数。构建身份管理的对象应该很方便或者很通用,如下:

  val authenticator = new AuthBase()
    .withAlgorithm(JwtAlgorithm.HS256)
    .withSecretKey("OpenSesame")
    .withUserFunc(getValidUser)

AuthBase源码如下:

package com.datatech.restapi

import akka.http.scaladsl.server.directives.Credentials
import pdi.jwt._
import org.json4s.native.Json
import org.json4s._
import org.json4s.jackson.JsonMethods._
import pdi.jwt.algorithms._
import scala.util._

object AuthBase {
  type UserInfo = Map[String, Any]
  case class AuthBase(
                       algorithm: JwtAlgorithm = JwtAlgorithm.HMD5,
                       secret: String = "OpenSesame",
                       getUserInfo: Credentials => Option[UserInfo] = null) {
    ctx =>

    def withAlgorithm(algo: JwtAlgorithm): AuthBase = ctx.copy(algorithm=algo)
    def withSecretKey(key: String): AuthBase = ctx.copy(secret = key)
    def withUserFunc(f: Credentials => Option[UserInfo]): AuthBase = ctx.copy(getUserInfo = f)

    def authenticateToken(credentials: Credentials): Option[String] =
      credentials match {
        case Credentials.Provided(token) =>
          algorithm match {
            case algo: JwtAsymmetricAlgorithm =>
              Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtAsymmetricAlgorithm]))) match {
                case true => Some(token)
                case _ => None
              }
            case _ =>
              Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtHmacAlgorithm]))) match {
                case true => Some(token)
                case _ => None
              }
          }
        case _ => None
      }

    def getUserInfo(token: String): Option[UserInfo] = {
      algorithm match {
        case algo: JwtAsymmetricAlgorithm =>
          Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtAsymmetricAlgorithm])) match {
            case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
            case Failure(err) => None
          }
        case _ =>
          Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtHmacAlgorithm])) match {
            case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
            case Failure(err) => None
          }
      }
    }

    def issueJwt(userinfo: UserInfo): String = {
      val claims = JwtClaim() + Json(DefaultFormats).write(("userinfo", userinfo))
      Jwt.encode(claims, secret, algorithm)
    }
  }

}

我已经把多个通用的函数封装在里面了。再模拟一个用户身份管理对象:

package com.datatech.restapi
import akka.http.scaladsl.server.directives.Credentials
import AuthBase._
object MockUserAuthService {

  case class User(username: String, password: String, userInfo: UserInfo)
  val validUsers = Seq(User("johnny", "p4ssw0rd",Map("shopid" -> "1101", "userid" -> "101"))
    ,User("tiger", "secret", Map("shopid" -> "1101" , "userid" -> "102")))

  def getValidUser(credentials: Credentials): Option[UserInfo] =
    credentials match {
      case p @ Credentials.Provided(_) =>
        validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match {
          case Some(user) => Some(user.userInfo)
          case _ => None
        }
      case _ => None
    }

}

好了,服务端示范代码中可以直接构建或者调用这些标准的类型了:

package com.datatech.restapi

import akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import pdi.jwt._
import AuthBase._
import MockUserAuthService._

object RestApiServer extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher



  val authenticator = new AuthBase()
    .withAlgorithm(JwtAlgorithm.HS256)
    .withSecretKey("OpenSesame")
    .withUserFunc(getValidUser)

  val route =
     path("auth") {
        authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
          post { complete(authenticator.issueJwt(userinfo))}
        }
     } ~
       pathPrefix("api") {
          authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
            (path("hello") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
            (path("how are you") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            }
            // ~ ...
          }
     }


  val (port, host) = (50081,"192.168.11.189")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()


  bindingFuture.flatMap(_.unbind())
  .onComplete(_ => httpSys.terminate())


}

就是说后面的http功能可以直接插进这个框架,精力可以完全聚焦于具体每项功能的开发上了。

然后用下面的客户端测试代码:

import akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers._
import scala.concurrent._
import akka.http.scaladsl.model._
import pdi.jwt._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.util._
import scala.concurrent.duration._



object RestApiClient  {
  type UserInfo = Map[String,Any]
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/")

    val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
    val authRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/auth",
      headers = List(authorization)
    )


    val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)

    val respToken = for {
      resp <- futToken
      jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
    } yield jstr

    val jstr =  Await.result[String](respToken,2 seconds)
    println(jstr)

    scala.io.StdIn.readLine()

    val parts = Jwt.decodeRawAll(jstr, "OpenSesame", Seq(JwtAlgorithm.HS256)) match {
      case Failure(exception) => println(s"Error: ${exception.getMessage}")
      case Success(value) =>
        println(((parse(value._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
    }

    scala.io.StdIn.readLine()

    val authentication = headers.Authorization(OAuth2BearerToken(jstr))
    val apiRequest = HttpRequest(
      HttpMethods.GET,
      uri = "http://192.168.11.189:50081/api/hello",
    ).addHeader(authentication)

    val futAuth: Future[HttpResponse] = Http().singleRequest(apiRequest)

    println(Await.result(futAuth,2 seconds))


    scala.io.StdIn.readLine()
    system.terminate()
  }

}

build.sbt

name := "restapi"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http"   % "10.1.8",
  "com.typesafe.akka" %% "akka-stream" % "2.5.23",
  "com.pauldijou" %% "jwt-core" % "3.0.1",
  "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
  "org.json4s" %% "json4s-native" % "3.6.1",
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
  "org.slf4j" % "slf4j-simple" % "1.7.25",
  "org.json4s" %% "json4s-jackson" % "3.6.7",
  "org.json4s" %% "json4s-ext" % "3.6.7"
)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Akka(41): Http:DBTable-rows streaming - 数据库表行交换

      在前面一篇讨论里我们介绍了通过http进行文件的交换。因为文件内容是以一堆bytes来表示的,而http消息的数据部分也是byte类型的,所以我们可以直接用...

    用户1150956
  • restapi(1)- 文件上传下载服务

    上次对restapi开了个头,设计了一个包括了身份验证和使用权限的restful服务开发框架。这是一个通用框架,开发人员只要直接往里面加新功能就行了。虽然这次...

    用户1150956
  • akka-typed(6) - cluster:group router, cluster-load-balancing

    先谈谈akka-typed的router actor。route 分pool router, group router两类。我们先看看pool-router的使...

    用户1150956
  • restapi(1)- 文件上传下载服务

    上次对restapi开了个头,设计了一个包括了身份验证和使用权限的restful服务开发框架。这是一个通用框架,开发人员只要直接往里面加新功能就行了。虽然这次...

    用户1150956
  • akka-typed(6) - cluster:group router, cluster-load-balancing

    先谈谈akka-typed的router actor。route 分pool router, group router两类。我们先看看pool-router的使...

    用户1150956
  • wordpress后台添加左侧边栏菜单如何操作

      我们有时为了方便操作会把一些特定的链接添加到wordpress后台左侧菜单栏中,这个要如何实现呢?其实不会很难,使用两个WordPress内置函数就可以解决...

    ytkah
  • 使用 Kotlin + WebFlux/RxJava 2 实现响应式以及尝试正式版本的协程WebFluxRxJava 2Kotlin 1.3 的 Coroutines总结

    在前一篇文章《使用 Kotlin + Spring Boot 进行后端开发》中,曾介绍过尝试使用 Kotlin 来做后端开发。这一次,尝试 WebFlux 以及...

    fengzhizi715
  • [spark streaming] DStream 和 DStreamGraph 解析

    Spark Streaming 是基于Spark Core将流式计算分解成一系列的小批处理任务来执行。

    UFO
  • restapi(2)- generic restful CRUD:通用的restful风格数据库表维护工具

    研究关于restapi的初衷是想搞一套通用的平台数据表维护http工具。前面谈过身份验证和使用权限、文件的上传下载,这次来到具体的数据库表维护。我们在这篇...

    用户1150956
  • 使用docker增加nginx

    ##安装编译docker 环境 我们这里采用的是nginx1.16.0 版本来进行编译安装的,如果有需要你可以自行更改成别的nginx版本,Dockerfile...

    张琳兮

扫码关注云+社区

领取腾讯云代金券