Akka(42): Http:身份验证 - authentication, authorization and use of raw headers

   当我们把Akka-http作为数据库数据交换工具时,数据是以Source[ROW,_]形式存放在Entity里的。很多时候除数据之外我们可能需要进行一些附加的信息传递如对数据的具体处理方式等。我们可以通过Akka-http的raw-header来实现附加自定义消息的传递,这项功能可以通过Akka-http提供的raw-header筛选功能来实现。在客户端我们把附加消息放在HttpRequest的raw header里,如下:

  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
                   .addHeader(RawHeader("action","insert:county"))

在这里客户端注明上传数据应插入county表。服务端可以像下面这样获取这项信息:

             optionalHeaderValueByName("action") {
                case Some(action) =>
                  entity(asSourceOf[County]) { source =>
                    val futofNames: Future[List[String]] =
                      source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                    complete(s"Received rows for $action")
                  }
                case None => complete ("No action specified!")
              }

Akka-http通过Credential类的Directive提供了authentication和authorization。在客户端可以用下面的方法提供自己的用户身份信息:

  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
    .addHeader(RawHeader("action","insert:county"))
    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))

服务端对客户端的身份验证处理方法如下:

  import akka.http.scaladsl.server.directives.Credentials
  def myUserPassAuthenticator(credentials: Credentials): Future[Option[User]] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    credentials match {
      case p @ Credentials.Provided(id) =>
        Future {
          // potentially
          if (p.verify("p4ssw0rd")) Some(User(id))
          else None
        }
      case _ => Future.successful(None)
    }
  }

  case class User(name: String)
  val validUsers = Set("john","peter","tiger","susan")
  def hasAdminPermissions(user: User): Future[Boolean] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    Future.successful(validUsers.contains(user.name))
  }

下面是Credential-Directive的使用方法:

         authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>
            authorizeAsync(_ => hasPermissions(user)) {
              withoutSizeLimit {
                handleExceptions(postExceptionHandler) {
                  optionalHeaderValueByName("action") {
                    case Some(action) =>
                      entity(asSourceOf[County]) { source =>
                        val futofNames: Future[List[String]] =
                          source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                        complete(s"Received rows for $action sent from $user")
                      }
                    case None => complete(s"$user did not specify action for uploaded rows!")
                  }
                }
              }
            }
          }

下面是本次讨论的示范代码:

客户端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import scala.util._
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.model._
import spray.json._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  implicit val countyFormat = jsonFormat2(County)
}

object HttpClientDemo extends App {
  import Converters._

  implicit val sys = ActorSystem("ClientSys")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()

  import akka.util.ByteString
  import akka.http.scaladsl.model.HttpEntity.limitableByteSource

  val source: Source[County,NotUsed] = Source(1 to 5).map {i => County(i, s"广西壮族自治区地市县编号 #$i")}
  def countyToByteString(c: County) = {
    ByteString(c.toJson.toString)
  }
  val flowCountyToByteString : Flow[County,ByteString,NotUsed] = Flow.fromFunction(countyToByteString)

  val rowBytes = limitableByteSource(source via flowCountyToByteString)

  import akka.http.scaladsl.model.headers._
  val request = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/rows")
    .addHeader(RawHeader("action","insert:county"))
    .addCredentials(BasicHttpCredentials("john", "p4ssw0rd"))

  val data = HttpEntity(
    ContentTypes.`application/json`,
    rowBytes
  )

  def uploadRows(request: HttpRequest, dataEntity: RequestEntity) = {
    val futResp = Http(sys).singleRequest(
      request.copy(entity = dataEntity)
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")

      }
  }


  uploadRows(request,data)

  scala.io.StdIn.readLine()

  sys.terminate()

}

服务端:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka._
import akka.http.scaladsl.common._
import spray.json.DefaultJsonProtocol
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import scala.concurrent._
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._

trait MyFormats extends SprayJsonSupport with DefaultJsonProtocol
object Converters extends MyFormats {
  case class County(id: Int, name: String)
  val source: Source[County, NotUsed] = Source(1 to 5).map { i => County(i, s"中国广东省地区编号 #$i") }
  implicit val countyFormat = jsonFormat2(County)
}

object HttpServerDemo extends App {

  import Converters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher


  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  def postExceptionHandler: ExceptionHandler =
    ExceptionHandler {
      case _: RuntimeException =>
        extractRequest { req =>
          req.discardEntityBytes()
          complete((StatusCodes.InternalServerError.intValue, "Upload Failed!"))
        }
    }

  import akka.http.scaladsl.server.directives.Credentials
  def userPassAuthenticator(credentials: Credentials): Future[Option[User]] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    credentials match {
      case p @ Credentials.Provided(id) =>
        Future {
          // potentially
          if (p.verify("p4ssw0rd")) Some(User(id))
          else None
        }
      case _ => Future.successful(None)
    }
  }

  case class User(name: String)
  val validUsers = Set("john","peter","tiger","susan")
  def hasPermissions(user: User): Future[Boolean] = {
    implicit val blockingDispatcher = httpSys.dispatchers.lookup("akka-httpblocking-ops-dispatcher")
    Future.successful(validUsers.contains(user.name))
  }

  val route =
    path("rows") {
      get {
        complete {
          source
        }
      } ~
        post {
          authenticateBasicAsync(realm = "secure site", userPassAuthenticator) { user =>
            authorizeAsync(_ => hasPermissions(user)) {
              withoutSizeLimit {
                handleExceptions(postExceptionHandler) {
                  optionalHeaderValueByName("action") {
                    case Some(action) =>
                      entity(asSourceOf[County]) { source =>
                        val futofNames: Future[List[String]] =
                          source.runFold(List[String](""))((acc, b) => acc ++ List(b.name))
                        complete(s"Received rows for $action sent from $user")
                      }
                    case None => complete(s"$user did not specify action for uploaded rows!")
                  }
                }
              }
            }
          }
        }
    }

  val (port, host) = (8011,"localhost")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏熊二哥

Java基础组件快速入门

最近需要上线很多新的JAVA项目,然而很多JAVA的相关库都不太熟悉,项目实现起来遇到了不小阻力,熬了好几天夜。现在手头的工作基本完成了,因此打算好好来归纳下j...

2396
来自专栏Java Edge

放弃 Calender优雅地使用Joda-Time吧Joda 大型项目Joda 简介Joda 和 JDK 互操作性Joda 的关键日期/时间概念

2817
来自专栏一个会写诗的程序员的博客

光剑评注:其实,说了这么多废话,无非就是: 一切皆是映射。不管是嵌套 XML,还是 Lisp 嵌套括号,还是 XXX 的 Map 数据结构,一切都是树形结构——映射。Lisp的本质(The Natur

http://www.defmacro.org/ramblings/lisp.html

1252
来自专栏HansBug's Lab

1740: [Usaco2005 mar]Yogurt factory 奶酪工厂

1740: [Usaco2005 mar]Yogurt factory 奶酪工厂 Time Limit: 5 Sec  Memory Limit: 64 MB ...

2515
来自专栏计算机视觉与深度学习基础

Leetcode 289. Game of Life

According to the Wikipedia's article: "The Game of Life, also known simply as ...

3314
来自专栏NetCore

[原创]Fluent NHibernate之旅(四)-- 关系(下)

最近一直忙着准备去旅行的东东,所以进度慢下来了,明天就要出发了,嘿嘿,在出发前,把多对多给写完吧。如果你第一次看这个系列,可以先看看先前几篇,了解下。 一、开篇...

20810
来自专栏ml

HDUOJ----1234 开门人和关门人(浙江大学考研题)

开门人和关门人 Time Limit: 2000/1000 MS (Java/Others)    Memory Limit: 65536/32768 K (J...

2655
来自专栏happyJared

设计模式入门

  俗话说,好记性也不如烂笔头,最近开始阅读设计模式这方面的书籍,算是借此开个好头,把一些理解的和不太理解的都写下来。本人工作时间不长,经验、资历各方面也还比较...

561
来自专栏coding

node.js要不要加分号

我觉得node.js在语法层面挺拧巴的,要么就像PHP那样严格要求加分号,要么就像python一律不加分号。而node.js却表现出一副欲拒还迎的姿态,让人感觉...

1002
来自专栏我是攻城师

Java时间处理神器之Joda-Time

3995

扫码关注云+社区