Akka(30): Http:High-Level-Api,Routing DSL

  在上篇我们介绍了Akka-http Low-Level-Api。实际上这个Api提供了Server对进来的Http-requests进行处理及反应的自定义Flow或者转换函数的接入界面。我们看看下面官方文档给出的例子:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import scala.io.StdIn

object WebServer {

  def main(args: Array[String]) {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future map/flatmap in the end
    implicit val executionContext = system.dispatcher

    val requestHandler: HttpRequest => HttpResponse = {
      case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
        HttpResponse(entity = HttpEntity(
          ContentTypes.`text/html(UTF-8)`,
          "<html><body>Hello world!</body></html>"))

      case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
        HttpResponse(entity = "PONG!")

      case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
        sys.error("BOOM!")

      case r: HttpRequest =>
        r.discardEntityBytes() // important to drain incoming HTTP Entity stream
        HttpResponse(404, entity = "Unknown resource!")
    }

    val bindingFuture = Http().bindAndHandleSync(requestHandler, "localhost", 8080)
    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
    StdIn.readLine() // let it run until user presses return
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }
}

  我们看到上面例子里的requestHandler函数用模式匹配方式对可能收到的HttpRequest进行了相关HttpResponse的对应。在对应的过程中可能还会按request要求进行一些Server端的运算作为例如Rest-Api这样的服务。不过对于大型的服务,模式匹配方式就会显得篇幅臃肿及模式僵化。Akka-http提供了一套routing DSL作为High-Level-Api的主要组成部分。用routing DSL代替Low-Level-Api的模式匹配方式可以更简练的编制HttpRequest到HttpResponse的转换服务,能更灵活高效的实现现代大型Rest-Api服务。routing DSL实现Rest-Api服务的方式是通过构建一个由组件Directives组合而成的多个多层三明治结构Route。Route是一个类型:

  type Route = RequestContext ⇒ Future[RouteResult]

下面是个Route例子:

    val route: Flow[HttpRequest, HttpResponse, NotUsed]=
      get {
        pathSingleSlash {
          complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,"<html><body>Hello world!</body></html>"))
        } ~
          path("ping") {
            complete("PONG!")
          } ~
          path("crash") {
            sys.error("BOOM!")
          }
      }

在上期讨论的例子里我们可以这样使用route:

  val futBinding: Future[Http.ServerBinding] =
    connSource.to { Sink.foreach{ connection =>
      println(s"client address ${connection.remoteAddress}")
      //    connection handleWith flow
      //    connection handleWithSyncHandler syncHandler
      //connection handleWithAsyncHandler asyncHandler
      connection handleWith route
    }}.run()

handleWith(flow)的参数应该是Flow[HttpRequest,HttpResponse,_]才对呀?这个我们先看看RouteResult对象: 

/**
 * The result of handling a request.
 *
 * As a user you typically don't create RouteResult instances directly.
 * Instead, use the methods on the [[RequestContext]] to achieve the desired effect.
 */
sealed trait RouteResult extends javadsl.server.RouteResult

object RouteResult {
  final case class Complete(response: HttpResponse) extends javadsl.server.Complete with RouteResult {
    override def getResponse = response
  }
  final case class Rejected(rejections: immutable.Seq[Rejection]) extends javadsl.server.Rejected with RouteResult {
    override def getRejections = rejections.map(r ⇒ r: javadsl.server.Rejection).toIterable.asJava
  }

  implicit def route2HandlerFlow(route: Route)(implicit
    routingSettings: RoutingSettings,
                                               parserSettings:   ParserSettings,
                                               materializer:     Materializer,
                                               routingLog:       RoutingLog,
                                               executionContext: ExecutionContext = null,
                                               rejectionHandler: RejectionHandler = RejectionHandler.default,
                                               exceptionHandler: ExceptionHandler = null): Flow[HttpRequest, HttpResponse, NotUsed] =
    Route.handlerFlow(route)
}

这里有个隐式转换route2HandlerFlow把Route转换成Flow[HttpRequest,HttpResponse,NotUsed],问题解决了。

从type Route=RequestContext => Future[RouteResult]可以看到:Route就是一个把RequestContext转换成Future[RouteResult]的函数。RequestContext实质上封装了个Request以及对Request进行操作的环境、配置和工具:

/**
 * This class is not meant to be extended by user code.
 *
 * Immutable object encapsulating the context of an [[akka.http.scaladsl.model.HttpRequest]]
 * as it flows through a akka-http Route structure.
 */
@DoNotInherit
trait RequestContext {

  /** The request this context represents. Modelled as a `val` so as to enable an `import ctx.request._`. */
  val request: HttpRequest

  /** The unmatched path of this context. Modelled as a `val` so as to enable an `import ctx.unmatchedPath._`. */
  val unmatchedPath: Uri.Path

  /**
   * The default ExecutionContext to be used for scheduling asynchronous logic related to this request.
   */
  implicit def executionContext: ExecutionContextExecutor
...
}

Route是一种可组合组件。我们可以用简单的Route组合成更多层次的Route。下面是组合Route的几种方式:

1、Route转化:对输入的request,输出的response进行转化处理后把实际运算托付给下一层内部(inner)Route

2、筛选Route:只容许符合某种条件的Route通过并拒绝其它不符合条件的Route

3、链接Route:假如一个Route被拒绝,尝试下一个Route。这个是通过 ~ 操作符号实现的

在Akka-http的routing DSL里这些Route组合操作是通过Directive实现的。Akka-http提供了大量现成的Directive,我们也可以自定义一些特殊功能的Directive,详情可以查询官方文件或者api文件。

Directive的表达形式如下:

dirname(arguments) { extractions =>
  ... // 内层inner route
}

下面是Directive的一些用例: 

下面的三个route效果相等:

val route: Route = { ctx =>
  if (ctx.request.method == HttpMethods.GET)
    ctx.complete("Received GET")
  else
    ctx.complete("Received something else")
}

val route =
  get {
    complete("Received GET")
  } ~
  complete("Received something else")
  
val route =
  get { ctx =>
    ctx.complete("Received GET")
  } ~
  complete("Received something else")

下面列出一些Directive的组合例子:

val route: Route =
  path("order" / IntNumber) { id =>
    get {
      complete {
        "Received GET request for order " + id
      }
    } ~
    put {
      complete {
        "Received PUT request for order " + id
      }
    }
  }

def innerRoute(id: Int): Route =
  get {
    complete {
      "Received GET request for order " + id
    }
  } ~
  put {
    complete {
      "Received PUT request for order " + id
    }
  }
val route: Route = path("order" / IntNumber) { id => innerRoute(id) }

val route =
  path("order" / IntNumber) { id =>
    (get | put) { ctx =>
      ctx.complete(s"Received ${ctx.request.method.name} request for order $id")
    }
  }

val route =
  path("order" / IntNumber) { id =>
    (get | put) {
      extractMethod { m =>
        complete(s"Received ${m.name} request for order $id")
      }
    }
  }

val getOrPut = get | put
val route =
  path("order" / IntNumber) { id =>
    getOrPut {
      extractMethod { m =>
        complete(s"Received ${m.name} request for order $id")
      }
    }
  }

val route =
  (path("order" / IntNumber) & getOrPut & extractMethod) { (id, m) =>
    complete(s"Received ${m.name} request for order $id")
  }

val orderGetOrPutWithMethod =
  path("order" / IntNumber) & (get | put) & extractMethod
val route =
  orderGetOrPutWithMethod { (id, m) =>
    complete(s"Received ${m.name} request for order $id")
  }

上面例子里的~ & | 定义如下:

object RouteConcatenation extends RouteConcatenation {

  class RouteWithConcatenation(route: Route) {
    /**
     * Returns a Route that chains two Routes. If the first Route rejects the request the second route is given a
     * chance to act upon the request.
     */
    def ~(other: Route): Route = { ctx ⇒
      import ctx.executionContext
      route(ctx).fast.flatMap {
        case x: RouteResult.Complete ⇒ FastFuture.successful(x)
        case RouteResult.Rejected(outerRejections) ⇒
          other(ctx).fast.map {
            case x: RouteResult.Complete               ⇒ x
            case RouteResult.Rejected(innerRejections) ⇒ RouteResult.Rejected(outerRejections ++ innerRejections)
          }
      }
    }
  }
}

  /**
   * Joins two directives into one which runs the second directive if the first one rejects.
   */
  def |[R >: L](that: Directive[R]): Directive[R] =
    recover(rejections ⇒ directives.BasicDirectives.mapRejections(rejections ++ _) & that)(that.ev)

  /**
   * Joins two directives into one which extracts the concatenation of its base directive extractions.
   * NOTE: Extraction joining is an O(N) operation with N being the number of extractions on the right-side.
   */
  def &(magnet: ConjunctionMagnet[L]): magnet.Out = magnet(this)

我们可以从上面这些示范例子得出结论:Directive的组合能力是routing DSL的核心。来看看Directive的组合能力是如何实现的。Directive类定义如下:

//#basic
abstract class Directive[L](implicit val ev: Tuple[L]) {

  /**
   * Calls the inner route with a tuple of extracted values of type `L`.
   *
   * `tapply` is short for "tuple-apply". Usually, you will use the regular `apply` method instead,
   * which is added by an implicit conversion (see `Directive.addDirectiveApply`).
   */
  def tapply(f: L ⇒ Route): Route
  ...
}
  /**
   * Constructs a directive from a function literal.
   */
  def apply[T: Tuple](f: (T ⇒ Route) ⇒ Route): Directive[T] =
    new Directive[T] { def tapply(inner: T ⇒ Route) = f(inner) }

  /**
   * A Directive that always passes the request on to its inner route (i.e. does nothing).
   */
  val Empty: Directive0 = Directive(_(()))
...
  implicit class SingleValueModifiers[T](underlying: Directive1[T]) extends AnyRef {
    def map[R](f: T ⇒ R)(implicit tupler: Tupler[R]): Directive[tupler.Out] =
      underlying.tmap { case Tuple1(value) ⇒ f(value) }

    def flatMap[R: Tuple](f: T ⇒ Directive[R]): Directive[R] =
      underlying.tflatMap { case Tuple1(value) ⇒ f(value) }

    def require(predicate: T ⇒ Boolean, rejections: Rejection*): Directive0 =
      underlying.filter(predicate, rejections: _*).tflatMap(_ ⇒ Empty)

    def filter(predicate: T ⇒ Boolean, rejections: Rejection*): Directive1[T] =
      underlying.tfilter({ case Tuple1(value) ⇒ predicate(value) }, rejections: _*)
  }
}

注意implicit ev: Tuple[L]是给compiler的证例,它要求Tuple[L]存在于可视域。Akka-http提供了所有22个TupleXX[L]的隐形实例。再注意implicit class singleValueModifiers[T]:它提供了多层Directive的自动展平,能够实现下面的自动转换结果:

Directive1[T] = Directive[Tuple1[T]]
Directive1[Tuple2[M,N]] = Directive[Tuple1[Tuple2[M,N]]] = Directive[Tuple2[M,N]]
Directive1[Tuple3[M,N,G]] = ... = Directive[Tuple3[M,N,G]]
Directive1[Tuple4[M1,M2,M3,M4]] = ... = Directive[Tuple4[M1,M2,M3,M4]]
...
Directive1[Unit] = Directive0

Directive1,Directive0:

  type Directive0 = Directive[Unit]
  type Directive1[T] = Directive[Tuple1[T]]

下面是这几种Directive的使用模式:

  dirname { route }                  //Directive0
  dirname[L] { L => route }          //Directive1[L]
  dirname[T] { (T1,T2...) => route}  //Directive[T]

任何类型值到Tuple的自动转换是通过Tupler类实现的:

/**
 * Provides a way to convert a value into an Tuple.
 * If the value is already a Tuple then it is returned unchanged, otherwise it's wrapped in a Tuple1 instance.
 */
trait Tupler[T] {
  type Out
  def OutIsTuple: Tuple[Out]
  def apply(value: T): Out
}

object Tupler extends LowerPriorityTupler {
  implicit def forTuple[T: Tuple]: Tupler[T] { type Out = T } =
    new Tupler[T] {
      type Out = T
      def OutIsTuple = implicitly[Tuple[Out]]
      def apply(value: T) = value
    }
}

private[server] abstract class LowerPriorityTupler {
  implicit def forAnyRef[T]: Tupler[T] { type Out = Tuple1[T] } =
    new Tupler[T] {
      type Out = Tuple1[T]
      def OutIsTuple = implicitly[Tuple[Out]]
      def apply(value: T) = Tuple1(value)
    }
}

我的理解是:Route里Directive的主要功能可以分成两部分:一是如程序菜单拣选,二是对Request,Response,Entity的读写。我们把第二项功能放在以后的讨论里,下面就提供一些RestApi的菜单拣选样例:

trait UsersApi extends JsonMappings{
  val usersApi =
    (path("users") & get ) {
       complete (UsersDao.findAll.map(_.toJson))
    }~
    (path("users"/IntNumber) & get) { id =>
        complete (UsersDao.findById(id).map(_.toJson))
    }~
    (path("users") & post) { entity(as[User]) { user =>
        complete (UsersDao.create(user).map(_.toJson))
      }
    }~
    (path("users"/IntNumber) & put) { id => entity(as[User]) { user =>
        complete (UsersDao.update(user, id).map(_.toJson))
      }
    }~
    (path("users"/IntNumber) & delete) { userId =>
      complete (UsersDao.delete(userId).map(_.toJson))
    }
}

trait CommentsApi extends JsonMappings{
  val commentsApi =
    (path("users"/IntNumber/"posts"/IntNumber/"comments") & get ) {(userId, postId) =>
       complete (CommentsDao.findAll(userId, postId).map(_.toJson))
    }~
      (path("users"/IntNumber/"posts"/IntNumber/"comments"/IntNumber) & get) { (userId, postId, commentId) =>
        complete (CommentsDao.findById(userId, postId, commentId).map(_.toJson))
    }~
      (path("comments") & post) { entity(as[Comment]) { comment =>
        complete (CommentsDao.create(comment).map(_.toJson))
      }
    }~
      (path("users"/IntNumber/"posts"/IntNumber/"comments"/IntNumber) & put) { (userId, postId, commentId) => entity(as[Comment]) { comment =>
        complete (CommentsDao.update(comment, commentId).map(_.toJson))
      }
    }~
      (path("comments"/IntNumber) & delete) { commentId =>
        complete (CommentsDao.delete(commentId).map(_.toJson))
    }
}

trait PostsApi extends JsonMappings{
  val postsApi =
    (path("users"/IntNumber/"posts") & get){ userId =>
      complete (PostsDao.findUserPosts(userId).map(_.toJson))
    }~
    (path("users"/IntNumber/"posts"/IntNumber) & get) { (userId,postId) =>
      complete (PostsDao.findByUserIdAndId(userId, postId).map(_.toJson))
    }~
    (path("users"/IntNumber/"posts") & post) { userId => entity(as[Post]) { post =>
      complete (PostsDao.create(post).map(_.toJson))
    }}~
    (path("users"/IntNumber/"posts"/IntNumber) & put) { (userId, id) => entity(as[Post]) { post =>
      complete (PostsDao.update(post, id).map(_.toJson))
    }}~
    (path("users"/IntNumber/"posts"/IntNumber) & delete) { (userId, postId) =>
      complete (PostsDao.delete(postId).map(_.toJson))
    }
}

  val routes =
    pathPrefix("v1") {
      usersApi ~
      postsApi ~
      commentsApi
    } ~ path("")(getFromResource("public/index.html"))

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏hbbliyong

WPF Trigger for IsSelected in a DataTemplate for ListBox items

<DataTemplate DataType="{x:Type vm:HeaderSlugViewModel}"> <vw:HeaderSlug...

4214
来自专栏Ceph对象存储方案

Luminous版本PG 分布调优

Luminous版本开始新增的balancer模块在PG分布优化方面效果非常明显,操作也非常简便,强烈推荐各位在集群上线之前进行这一操作,能够极大的提升整个集群...

3625
来自专栏陈仁松博客

ASP.NET Core 'Microsoft.Win32.Registry' 错误修复

今天在发布Asp.net Core应用到Azure的时候出现错误InvalidOperationException: Cannot find compilati...

5208
来自专栏张善友的专栏

Miguel de Icaza 细说 Mix 07大会上的Silverlight和DLR

Mono之父Miguel de Icaza 详细报道微软Mix 07大会上的Silverlight和DLR ,上面还谈到了Mono and Silverligh...

2997
来自专栏一个会写诗的程序员的博客

Spring Reactor 项目核心库Reactor Core

Non-Blocking Reactive Streams Foundation for the JVM both implementing a Reactiv...

2732
来自专栏张善友的专栏

LINQ via C# 系列文章

LINQ via C# Recently I am giving a series of talk on LINQ. the name “LINQ via C...

2965
来自专栏芋道源码1024

熔断器 Hystrix 源码解析 —— 断路器 HystrixCircuitBreaker

本文主要基于 Hystrix 1.5.X 版本 1. 概述 2. HystrixCircuitBreaker 3. HystrixCircuitBreaker....

5717
来自专栏杨龙飞前端

scrollto 到指定位置

2894
来自专栏我和未来有约会

Kit 3D 更新

Kit3D is a 3D graphics engine written for Microsoft Silverlight. Kit3D was inita...

2886
来自专栏闻道于事

js登录滑动验证,不滑动无法登陆

js的判断这里是根据滑块的位置进行判断,应该是用一个flag判断 <%@ page language="java" contentType="text/html...

8368

扫码关注云+社区