Akka(29): Http:Server-Side-Api,Low-Level-Api

 Akka-http针对Connection的两头都提供了方便编程的Api,分别是Server-Side-Api和Client-Side-Api。通过这两个Api可以大大提高编程效率。当然,上期我们提到过,Http-Server是Akka-http的核心部分,所有系统集成功能都是在Server-Side实现的。Akka-http-Server-Side-Api可以说是最先进的Http-Server编程工具,支持:

  • Full support for HTTP persistent connections
  • Full support for HTTP pipelining
  • Full support for asynchronous HTTP streaming including “chunked” transfer encoding accessible through an idiomatic API
  • Optional SSL/TLS encryption
  • WebSocket support

Server-Side-Api又分两个层次:Low-level-Server-Side-Api和High-level-Server-Side-Api。Low-level-server-api支持HTTP/1.1Server所有功能,包括:

  • Connection management
  • Parsing and rendering of messages and headers
  • Timeout management (for requests and connections)
  • Response ordering (for transparent pipelining support)

其它Server功能如请求解析request routing,文件服务file serving,数据压缩compression等都放在了High-level-server-api里。Akka-http是基于Akka-stream编写的,所以我们需要从Akka-stream运算模式来理解Akka-http的类型表现形式。

一个Http-Server是绑定在一个Socket上来接收客户端上传的request进行相关的服务提供的。Server对Socket的绑定在Akka-http里的可以Stream形式来表现:

val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
  Http().bind(interface = "localhost", port = 8080)

Server-Side Socket绑定实际上就是一个Akka-stream-source:Source[IncomingConnection]:

  /**
   * Creates a [[akka.stream.scaladsl.Source]] of [[akka.http.scaladsl.Http.IncomingConnection]] instances which represents a prospective HTTP server binding
   * on the given `endpoint`.
   *
   * If the given port is 0 the resulting source can be materialized several times. Each materialization will
   * then be assigned a new local port by the operating system, which can then be retrieved by the materialized
   * [[akka.http.scaladsl.Http.ServerBinding]].
   *
   * If the given port is non-zero subsequent materialization attempts of the produced source will immediately
   * fail, unless the first materialization has already been unbound. Unbinding can be triggered via the materialized
   * [[akka.http.scaladsl.Http.ServerBinding]].
   *
   * If an [[ConnectionContext]] is given it will be used for setting up TLS encryption on the binding.
   * Otherwise the binding will be unencrypted.
   *
   * If no `port` is explicitly given (or the port value is negative) the protocol's default port will be used,
   * which is 80 for HTTP and 443 for HTTPS.
   *
   * To configure additional settings for a server started using this method,
   * use the `akka.http.server` config section or pass in a [[akka.http.scaladsl.settings.ServerSettings]] explicitly.
   */
  def bind(interface: String, port: Int = DefaultPortForProtocol,
           connectionContext: ConnectionContext = defaultServerHttpContext,
           settings:          ServerSettings    = ServerSettings(system),
           log:               LoggingAdapter    = system.log)(implicit fm: Materializer): Source[Http.IncomingConnection, Future[ServerBinding]] = {
    val fullLayer = fuseServerBidiFlow(settings, connectionContext, log)

    tcpBind(interface, choosePort(port, connectionContext), settings)
      .map(incoming ⇒ {
        val serverFlow = fullLayer.addAttributes(prepareAttributes(settings, incoming)) join incoming.flow
        IncomingConnection(incoming.localAddress, incoming.remoteAddress, serverFlow)
      })
      .mapMaterializedValue(materializeTcpBind)
  }

run这个Source[IncomingConnection]产生一串连接Connection: 

  /**
   * Represents one accepted incoming HTTP connection.
   */
  final case class IncomingConnection(
    localAddress:  InetSocketAddress,
    remoteAddress: InetSocketAddress,
    flow:          Flow[HttpResponse, HttpRequest, NotUsed]) {

    /**
     * Handles the connection with the given flow, which is materialized exactly once
     * and the respective materialization result returned.
     */
    def handleWith[Mat](handler: Flow[HttpRequest, HttpResponse, Mat])(implicit fm: Materializer): Mat =
      flow.joinMat(handler)(Keep.right).run()

    /**
     * Handles the connection with the given handler function.
     */
    def handleWithSyncHandler(handler: HttpRequest ⇒ HttpResponse)(implicit fm: Materializer): Unit =
      handleWith(Flow[HttpRequest].map(handler))

    /**
     * Handles the connection with the given handler function.
     */
    def handleWithAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse], parallelism: Int = 1)(implicit fm: Materializer): Unit =
      handleWith(Flow[HttpRequest].mapAsync(parallelism)(handler))
  }

IncomingConnection类型提供了个handleWith这样的streaming函数进行request到response的转换。用户可以下面的方式提供自定义的转换方法:

调用handleWith传入Flow[HttpRequest,HttpResponse,_],如:

  def req2Resp: HttpRequest => HttpResponse = _ => HttpResponse(entity=
    HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
  val flow = Flow.fromFunction(req2Resp)

提供HttpRequest=>HttpResponse函数传人handleWithSyncHandler:

def syncHandler: HttpRequest => HttpResponse = {
    case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) =>
      HttpResponse(entity=
        HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))

    case req: HttpRequest =>
      req.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404, entity = "Unknown resource!")
  }

提供HttpRequest=>Future[HttpResponse]函数传人handleWithASyncHandler:

  def asyncHandler: HttpRequest => Future[HttpResponse] = {
    case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) => Future {
      HttpResponse(entity=
        HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>")) }

    case req: HttpRequest => Future {
      req.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404, entity = "Unknown resource!")
    }
  }

run Source[IncomingConnection,Future[ServerBinding]]返回结果为Future[ServerBinding]:

  val futBinding: Future[Http.ServerBinding] =
    connSource.to { Sink.foreach{ connection =>
        println(s"client address ${connection.remoteAddress}")
  //      connection handleWith flow
  //    connection handleWithSyncHandler syncHandler
      connection handleWithAsyncHandler asyncHandler
    }}.run()

我们可以通过ServerBinding来释放绑定的Socket:

 /**
   * Represents a prospective HTTP server binding.
   *
   * @param localAddress  The local address of the endpoint bound by the materialization of the `connections` [[akka.stream.scaladsl.Source]]
   *
   */
  final case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () ⇒ Future[Unit]) {

    /**
     * Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections`
     * [[akka.stream.scaladsl.Source]]
     *
     * The produced [[scala.concurrent.Future]] is fulfilled when the unbinding has been completed.
     */
    def unbind(): Future[Unit] = unbindAction()
  }

我们可以调用这个unbind():

  futBinding.flatMap(_.unbind())

整个示范源代码如下:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import scala.concurrent._

object LowLevelServerApi extends App {
  implicit val httpSys = ActorSystem("actorSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEc = httpSys.dispatcher

  val (interface,port) = ("localhost",8088)
  val connSource: Source[Http.IncomingConnection,Future[Http.ServerBinding]] =
    Http().bind(interface,port)

  def req2Resp: HttpRequest => HttpResponse = _ => HttpResponse(entity=
    HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))
  val flow = Flow.fromFunction(req2Resp)

  def syncHandler: HttpRequest => HttpResponse = {
    case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) =>
      HttpResponse(entity=
        HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>"))

    case req: HttpRequest =>
      req.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404, entity = "Unknown resource!")
  }

  def asyncHandler: HttpRequest => Future[HttpResponse] = {
    case HttpRequest(HttpMethods.GET,Uri.Path("/"),_headers,_entiy,_protocol) => Future {
      HttpResponse(entity=
        HttpEntity(ContentTypes.`text/html(UTF-8)`,"<h> Hello World! </h>")) }

    case req: HttpRequest => Future {
      req.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404, entity = "Unknown resource!")
    }
  }

  val futBinding: Future[Http.ServerBinding] =
    connSource.to { Sink.foreach{ connection =>
        println(s"client address ${connection.remoteAddress}")
  //      connection handleWith flow
  //    connection handleWithSyncHandler syncHandler
      connection handleWithAsyncHandler asyncHandler
    }}.run()

  println(s"Server running at $interface $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  futBinding.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())
  
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊JerseyEurekaHttpClient的参数

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/transport/jersey/J...

382
来自专栏大内老A

WCF后续之旅(2): 如何对Channel Layer进行扩展——创建自定义Channel

在上一篇文章中,我们通过一个直接借助BasicHttpBinding对象实现Client和Server端进行通信的例子,对WCF channel layer进行...

1955
来自专栏码匠的流水账

聊聊spring kafka的retry

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/adapt...

412
来自专栏码匠的流水账

聊聊spring cloud gateway的PreserveHostHeaderGatewayFilter

本文主要研究下spring cloud gateway的PreserveHostHeaderGatewayFilter

562
来自专栏DOTNET

asp.net web api 版本控制

版本控制 版本控制的方法有很多,这里提供一种将Odata与普通web api版本控制机制统一的方法,但也可以单独控制,整合控制与单独控制主要的不同是:整合控制通...

3736
来自专栏NetCore

EventBus In eShop -- 解析微软微服务架构Demo(四)

引言 大家好像对分析源码厌倦了,说实在我也会厌倦,不过不看是无法分析其后面的东西,从易到难是一个必要的过程。 今天说下EventBus,前几天园里的大神已经把其...

2278
来自专栏张善友的专栏

.NET4.0的可扩展缓存系统

.NET Framework中,叫做System.Runtime.Caching,这不仅是个缓存库,还是个框架,可以在上面开发自己的库。ObjectCache定...

1918
来自专栏Linux驱动

STM32-正弦波可调(50HZ~20KHZ可调、峰峰值0~3.3V可调)

1.原理: 通过定时器每隔一段时间触发一次DAC转换,然后通过DMA发送正玄波码表值给DAC. 当需要改变频率HZ时,只需要修改定时器频率即可(最高只能达到20...

5039
来自专栏函数式编程语言及工具

Akka(39): Http:File streaming-文件交换

 所谓文件交换指的是Http协议中服务端和客户端之间文件的上传和下载。Akka-http作为一种系统集成工具应该具备高效率的数据交换方式包括文件交换和数据库表...

2259
来自专栏数据库新发现

VCS学习笔记

VERITAS Cluster Server(VCS) connects, or clusters, multiple, independent systems...

442

扫码关注云+社区