Akka(37): Http:客户端操作模式

   Akka-http的客户端连接模式除Connection-Level和Host-Level之外还有一种非常便利的模式:Request-Level-Api。这种模式免除了连接Connection的概念,任何时候可以直接调用singleRequest来与服务端沟通。下面我们用几个例子来示范singleRequest的用法:

  (for {
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message"))
    message <- Unmarshal(response.entity).to[String]
  } yield message).andThen {
    case Success(msg) => println(s"Received message: $msg")
    case Failure(err) => println(s"Error: ${err.getMessage}")
  }.andThen {case _ => sys.terminate()}

这是一个GET操作:用Http().singleRequest直接把HttpRequest发送给服务端uri并获取返回的HttpResponse。我们看到,整组函数的返回类型都是Future[?],所以用for-comprehension来把所有实际运算包嵌在Future运算模式内(context)。下面这个例子是客户端上传数据示范:

 (for {
    entity <- Marshal("Wata hell you doing?").to[RequestEntity]
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity))
    message <- Unmarshal(response.entity).to[String]
  } yield message).andThen {
    case Success(msg) => println(s"Received message: $msg")
    case Failure(err) => println(s"Error: ${err.getMessage}")
  }.andThen {case _ => sys.terminate()}

以上是个PUT操作。我们需要先构建数据载体HttpEntity。格式转换函数Marshal也返回Future[HttpEntity],所以也可以包含在for语句内。关注一下这个andThen,它可以连接一串多个monadic运算,在不影响上游运算结果的情况下实现一些副作用计算。值得注意的是上面这两个例子虽然表现形式很简洁,但我们无法对数据转换过程中的异常及response的状态码等进行监控。所以我们应该把整个过程拆分成两部分:先获取response,再具体处理response,包括核对状态,处理数据等:

  case class Item(id: Int, name: String, price: Double)

  def getItem(itemId: Int): Future[HttpResponse] = for {
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))
  } yield response

  def extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = {
    futResp.andThen {
      case Success(HttpResponse(StatusCodes.OK, _, entity, _)) =>
        Unmarshal(entity).to[T]
          .onComplete {
            case Success(t) => println(s"Got response entity: ${t}")
            case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")
          }
      case Success(_) => println("Exception in response!")
      case Failure(err) => println(s"Response Failed: ${err.getMessage}")
    }
  }
  extractEntity[Item](getItem(13))

现在这个extractEntity[Item](getItem(13))可以实现全过程的监控管理了。用同样的模式实现PUT操作:

  def putItem(item: Item): Future[HttpResponse] =
   for {
    reqEntity <- Marshal(item).to[RequestEntity]
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))
   } yield response

  extractEntity[Item](putItem(Item(23,"Item#23", 46.0)))
     .andThen { case _ => sys.terminate()}

当然,我们还是使用了前面几篇讨论里的Marshalling方式来进行数据格式的自动转换:

import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson
...
trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec
...
  import JsConverters._

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

如果我们需要对数据交换过程进行更细致的管控,用Host-Level-Api会更加适合。下面我们就针对Host-Level-Api构建一个客户端的工具库:

class PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)
                  (implicit sys: ActorSystem, mat: ActorMaterializer) {

  import sys.dispatcher

  private val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =
    Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings)
//单一request
  def requestSingleResponse(req: HttpRequest): Future[HttpResponse] = {
    Source.single(req -> 1)
      .via(cnnPool)
      .runWith(Sink.head).flatMap {
      case (Success(resp), _) => Future.successful(resp)
      case (Failure(fail), _) => Future.failed(fail)
    }
  }
//组串request
  def orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
    Source(reqs.zipWithIndex.toMap)
      .via(cnnPool)
      .runFold(SortedMap[Int, Future[HttpResponse]]()) {
        case (m, (Success(r), idx)) => m + (idx -> Future.successful(r))
        case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f))
      }.flatMap { m => Future.sequence(m.values) }
  }
}

下面是一种比较安全的模式:使用了queue来暂存request从而解决因发送方与接收方速率不同所产生的问题:

class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)
                          (qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew)
                  (implicit sys: ActorSystem, mat: ActorMaterializer) {
  import sys.dispatcher
  private val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] =
    Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings)

  val queue =
    Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy)
      .via(cnnPool)
      .to(Sink.foreach({
        case ((Success(resp), p)) => p.success(resp)
        case ((Failure(e), p))    => p.failure(e)
      })).run()

  def queueRequest(request: HttpRequest): Future[HttpResponse] = {
      val responsePromise = Promise[HttpResponse]()
      queue.offer(request -> responsePromise).flatMap {
        case QueueOfferResult.Enqueued    => responsePromise.future
        case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
        case QueueOfferResult.Failure(ex) => Future.failed(ex)
        case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
      }
  }
}

下面是这些工具函数的具体使用示范:

  val settings = ConnectionPoolSettings(sys)
    .withMaxConnections(8)
    .withMaxOpenRequests(8)
    .withMaxRetries(3)
    .withPipeliningLimit(4)
  val pooledClient = new PooledClient("localhost",8011,settings)

  def getItemByPool(itemId: Int): Future[HttpResponse] = for {
    response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))
  } yield response

  extractEntity[Item](getItemByPool(13))

  def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = {
    val reqs = itemIds.map { id =>
      HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id")
    }
    val rets = (for {
      responses <- pooledClient.orderedResponses(reqs)
    } yield responses)
    rets
  }
  val futResps = getItemsByPool(List(3,5,7))

  futResps.andThen {
    case Success(listOfResps) => {
      listOfResps.foreach { r =>
        r match {
          case HttpResponse(StatusCodes.OK, _, entity, _) =>
            Unmarshal(entity).to[Item]
              .onComplete {
                case Success(t) => println(s"Got response entity: ${t}")
                case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")
              }
          case _ => println("Exception in response!")
        }
      }
    }
    case _ => println("Failed to get list of responses!")
  }

  val queuedClient = new QueuedRequestsClient("localhost",8011,settings)()


  def putItemByQueue(item: Item): Future[HttpResponse] =
    for {
      reqEntity <- Marshal(item).to[RequestEntity]
      response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))
    } yield response

  extractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0)))
    .andThen { case _ => sys.terminate()}

下面是本次讨论的示范源代码:

服务端代码:

import akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._

import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson
trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec


object TestServer extends App with JsonCodec {
  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  import JsConverters._

  case class Item(id: Int, name: String, price: Double)
  val messages = path("message") {
    get {
      complete("hello, how are you?")
    } ~
    put {
      entity(as[String]) {msg =>
        complete(msg)
      }
    }
  }
  val items =
    (path("item" / IntNumber) & get) { id =>
       get {
         complete(Item(id, s"item#$id", id * 2.0))
       }
    } ~
      (path("item") & put) {
        entity(as[Item]) {item =>
          complete(item)
        }
     }

  val route = messages ~ items

  val (host, port) = ("localhost", 8011)

  val bindingFuture = Http().bindAndHandle(route,host,port)


  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

客户端源代码:

import akka.actor._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._

import scala.util._
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson

import scala.concurrent._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.unmarshalling._
import akka.http.scaladsl.marshalling.Marshal

import scala.collection.SortedMap
import akka.http.scaladsl.common._

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

class PooledClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)
                  (implicit sys: ActorSystem, mat: ActorMaterializer) {

  import sys.dispatcher

  private val cnnPool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool] =
    Http().cachedHostConnectionPool[Int](host = host, port = port, settings = poolSettings)

  def requestSingleResponse(req: HttpRequest): Future[HttpResponse] = {
    Source.single(req -> 1)
      .via(cnnPool)
      .runWith(Sink.head).flatMap {
      case (Success(resp), _) => Future.successful(resp)
      case (Failure(fail), _) => Future.failed(fail)
    }
  }

  def orderedResponses(reqs: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
    Source(reqs.zipWithIndex.toMap)
      .via(cnnPool)
      .runFold(SortedMap[Int, Future[HttpResponse]]()) {
        case (m, (Success(r), idx)) => m + (idx -> Future.successful(r))
        case (m, (Failure(f), idx)) => m + (idx -> Future.failed(f))
      }.flatMap { m => Future.sequence(m.values) }
  }
}
class QueuedRequestsClient(host: String, port: Int, poolSettings: ConnectionPoolSettings)
                          (qsize: Int = 10, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew)
                  (implicit sys: ActorSystem, mat: ActorMaterializer) {
  import sys.dispatcher
  private val cnnPool: Flow[(HttpRequest,Promise[HttpResponse]),(Try[HttpResponse],Promise[HttpResponse]),Http.HostConnectionPool] =
    Http().cachedHostConnectionPool[Promise[HttpResponse]](host=host,port=port,settings=poolSettings)

  val queue =
    Source.queue[(HttpRequest, Promise[HttpResponse])](qsize, overflowStrategy)
      .via(cnnPool)
      .to(Sink.foreach({
        case ((Success(resp), p)) => p.success(resp)
        case ((Failure(e), p))    => p.failure(e)
      })).run()

  def queueRequest(request: HttpRequest): Future[HttpResponse] = {
      val responsePromise = Promise[HttpResponse]()
      queue.offer(request -> responsePromise).flatMap {
        case QueueOfferResult.Enqueued    => responsePromise.future
        case QueueOfferResult.Dropped     => Future.failed(new RuntimeException("Queue overflowed. Try again later."))
        case QueueOfferResult.Failure(ex) => Future.failed(ex)
        case QueueOfferResult.QueueClosed => Future.failed(new RuntimeException("Queue was closed (pool shut down) while running the request. Try again later."))
      }
  }
}
object ClientRequesting extends App {
  import JsConverters._

  implicit val sys = ActorSystem("sysClient")
  implicit val mat = ActorMaterializer()
  implicit val ec = sys.dispatcher

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  case class Item(id: Int, name: String, price: Double)

  def extractEntity[T](futResp: Future[HttpResponse])(implicit um: Unmarshaller[ResponseEntity,T]) = {
    futResp.andThen {
      case Success(HttpResponse(StatusCodes.OK, _, entity, _)) =>
        Unmarshal(entity).to[T]
          .onComplete {
            case Success(t) => println(s"Got response entity: ${t}")
            case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")
          }
      case Success(_) => println("Exception in response!")
      case Failure(err) => println(s"Response Failed: ${err.getMessage}")
    }
  }

  
  (for {
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri="http://localhost:8011/message"))
    message <- Unmarshal(response.entity).to[String]
  } yield message).andThen {
    case Success(msg) => println(s"Received message: $msg")
    case Failure(err) => println(s"Error: ${err.getMessage}")
  }  //.andThen {case _ => sys.terminate()}


  (for {
    entity <- Marshal("Wata hell you doing?").to[RequestEntity]
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/message",entity=entity))
    message <- Unmarshal(response.entity).to[String]
  } yield message).andThen {
    case Success(msg) => println(s"Received message: $msg")
    case Failure(err) => println(s"Error: ${err.getMessage}")
  } //.andThen {case _ => sys.terminate()}


  def getItem(itemId: Int): Future[HttpResponse] = for {
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))
  } yield response

  extractEntity[Item](getItem(13))

  def putItem(item: Item): Future[HttpResponse] =
   for {
    reqEntity <- Marshal(item).to[RequestEntity]
    response <- Http().singleRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))
   } yield response

  extractEntity[Item](putItem(Item(23,"Item#23", 46.0)))
     .andThen { case _ => sys.terminate()}
  
  val settings = ConnectionPoolSettings(sys)
    .withMaxConnections(8)
    .withMaxOpenRequests(8)
    .withMaxRetries(3)
    .withPipeliningLimit(4)
  val pooledClient = new PooledClient("localhost",8011,settings)

  def getItemByPool(itemId: Int): Future[HttpResponse] = for {
    response <- pooledClient.requestSingleResponse(HttpRequest(method=HttpMethods.GET,uri = s"http://localhost:8011/item/$itemId"))
  } yield response

  extractEntity[Item](getItemByPool(13))

  def getItemsByPool(itemIds: List[Int]): Future[Iterable[HttpResponse]] = {
    val reqs = itemIds.map { id =>
      HttpRequest(method = HttpMethods.GET, uri = s"http://localhost:8011/item/$id")
    }
    val rets = (for {
      responses <- pooledClient.orderedResponses(reqs)
    } yield responses)
    rets
  }
  val futResps = getItemsByPool(List(3,5,7))

  futResps.andThen {
    case Success(listOfResps) => {
      listOfResps.foreach { r =>
        r match {
          case HttpResponse(StatusCodes.OK, _, entity, _) =>
            Unmarshal(entity).to[Item]
              .onComplete {
                case Success(t) => println(s"Got response entity: ${t}")
                case Failure(e) => println(s"Unmarshalling failed: ${e.getMessage}")
              }
          case _ => println("Exception in response!")
        }
      }
    }
    case _ => println("Failed to get list of responses!")
  }

  val queuedClient = new QueuedRequestsClient("localhost",8011,settings)()
  
  def putItemByQueue(item: Item): Future[HttpResponse] =
    for {
      reqEntity <- Marshal(item).to[RequestEntity]
      response <- queuedClient.queueRequest(HttpRequest(method=HttpMethods.PUT,uri="http://localhost:8011/item",entity=reqEntity))
    } yield response

  extractEntity[Item](putItemByQueue(Item(23,"Item#23", 46.0)))
    .andThen { case _ => sys.terminate()}
  

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏JadePeng的技术博客

使用Mongodb 做对象缓存

mongodb高效的访问速度,用来快速存取数据再合适不过了,缓存神马的,可以用这个的 另外,有的时候,如果仅仅存储几条数据,单独去建立一张表代价太大,这个时候,...

3795
来自专栏草根专栏

asp.net web api 2.2 基础框架(带例子)

简介 这个是我自己编写的asp.net web api 2.2的基础框架,使用了Entity Framework 6.2(beta)作为ORM。 该模板主要采用...

4999
来自专栏林德熙的博客

C# GUID ToString GUID 转 int

最近在看到小伙伴直接使用 Guid.ToString ,我告诉他需要使用 Guid.ToString(“N”) ,为什么需要使用 N ,因为默认的是 D 会出现...

3711
来自专栏知识分享

关于Http 传输二维json

然后就在想是不是   String str = jsonObject1.toString();    jsonObject.put("param",str); ...

1054
来自专栏逸鹏说道

Z.ExtensionMethods 一个强大的开源扩展库

今天有意的在博客园里面搜索了一下 Z.ExtensionMethods 这个扩展类库,确发现只搜到跟这个真正相关的才两篇博文而已,我都点进去看了一下,也都只是提...

2876
来自专栏Java成神之路

Java钉钉开发_03_通讯录管理之 人员管理 和 部门管理

1692
来自专栏ASP.NET MVC5 后台权限管理系统

ASP.NET MVC5+EF6+EasyUI 后台管理系统(81)-数据筛选(万能查询)

前言 听标题的名字似乎是一个非常牛X复杂的功能,但是实际上它确实是非常复杂的,我们本节将演示如何实现对数据,进行组合查询(数据筛选) 我们都知道Excel...

2488
来自专栏好好学java的技术栈

java爬虫带你爬天爬地爬人生,爬新浪

HttpClient是Apache Jakarta Common下的子项目,可以用来提供高效的、最新的、功能丰富的支持HTTP协议的客户端编程工具包,并且它支持...

1242
来自专栏菩提树下的杨过

Flash/Flex学习笔记(32):播放音乐并同步显示lyc歌词(适用于Silverlight)

题外话:个别朋友总是问我同样的问题,做为一名c#/silverlight程序员为啥还要学flash ? 回 答:看日本片时,就不能对照看欧美的么? 不体会日本的...

2007
来自专栏hbbliyong

Extjs4处理后台json数据中日期和时间的方法

当ASP.NET后台使用JavaScriptSerializer这个组件将对象序列化为json,或者使用ScriptMethod特性的json      [Sc...

3667

扫码关注云+社区