前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >STTP的基本使用(3):Response 转为 Future[( List[E], List[T] )]

STTP的基本使用(3):Response 转为 Future[( List[E], List[T] )]

作者头像
dhyuan
发布2022-05-30 14:05:57
4160
发布2022-05-30 14:05:57
举报
文章被收录于专栏:响应式编程

如果使用 AkkaHttp 作为 STTP 的 backend 来并发地处理 list of url,就会得到类似 List[Future[Response[Either[ResponseError[io.circe.Error], T]]]],这样的结果。

一般地,我们更期望给上层调用者返回 Future[(List[Exception], List[T])]这样的类型。

下面代码演示了如何把:

List[Future[Response[Either[ResponseError[io.circe.Error], NasaData]]]]

准换为

Future[(List[Throwable], List[NasaData])]。

其中 tuple 的第一部分表示 response 中所有的 exception,第二部分表示所有正常数据。

几个技术关键点是:

1)Future.sequence 用于把 List[Future] 转为 Future[List]。

2)忽略 STTP Response 里除去 body 数据的其它部分,并把 response 转为 Either[Throwable, NasaData]。

3)经过步骤 1,2,数据类型已经是 Future[List[Either[Throwable, NasaData]]]。

4)List[Either[Throwable, NasaData]] 转为 (List[Throwable], List[NasaData])的思路是:

a) 构造一个空的 Tuple (List[Throwable(), List[NasaData]())作为一个累计器。

b) 使用 List.folderLeft()遍历元素,根据是 left 还是 right 累加到累计器的_1 或_2。

下面链接的文章演示了两种转换方式,一种是 Scala 原生手写,一种是使用 CAT。https://pbassiner.github.io/blog/composing_future,_list_and_either.html。

但是,其中语句 case Left(err) => acc.left.map(_ => err) 的逻辑似乎与期望不符。如果 list of either 中有 Left,并不能工作。

代码语言:javascript
复制
  val erA: Either[String, Int] = Right(1)
  val erB: Either[String, Int] = Right(2)
  val erC: Either[String, Int] = Right(3)
  val elA: Either[String, Int] = Left("error1")
  val elB: Either[String, Int] = Left("error2")
  val listOfEither = List(erA, elA, erB, erC, elB)
  private val eitherOfList: Either[String, List[Int]] = listOfEither.foldLeft(Right[String, List[Int]](List()).asInstanceOf[Either[String, List[Int]]])((acc, elem) => {
    elem match {
      case Right(v) => acc.map(l => l :+ v)
      case Left(e) => acc.left.map(_ + e)
    }
  })
  println(s"eitherOfList: $eitherOfList")

输出如下,并没有error部分的信息。

代码语言:javascript
复制
eitherOfList: Right(List(1, 2, 3))

下面的代码演示了并发发出100个RestAPi请求获取数据的处理代码,结果保存在Tuple里,分别是所有失败和成功的数据。

代码语言:javascript
复制
  import io.circe.generic.auto._
  import sttp.client._
  import sttp.client.akkahttp.AkkaHttpBackend
  import sttp.client.circe._

  import scala.concurrent.Future
  import scala.concurrent.duration._
  import scala.concurrent.Await
  import scala.concurrent.ExecutionContext.Implicits.global

  val NASA_API_KEY = "XXXXXXXXXXXXXXX"
  val baseUrl = s"https://api.nasa.gov/neo/rest/v1/neo/browse?api_key=${NASA_API_KEY}"

  var pageSize = 10
  var pageNumb = 100

  case class Links(self: String, next: String)
  case class Page(number: Int, size: Int, total_elements: Int, total_pages: Int)
  case class NearEarthObject(absolute_magnitude_h: Double, designation: String)
  case class NasaData(links: Links, page: Page, near_earth_objects: List[NearEarthObject])

  implicit val sttpBackend = AkkaHttpBackend()
  
  val listOfFutureResult: List[Future[Response[Either[ResponseError[io.circe.Error], NasaData]]]] =
    (0 until pageNumb).toList.map(pageIndex => {
      val pageUrl = s"http://www.neowsapp.com/rest/v1/neo/browse?page=$pageIndex&size=$pageSize&api_key=${NASA_API_KEY}"
      basicRequest.get(uri"$pageUrl")
        .response(asJson[NasaData])
        .send()
    })

  // Note: Future[List] --> List[Future]
  val futureOfList: Future[List[Response[Either[ResponseError[io.circe.Error], NasaData]]]] = Future.sequence(listOfFutureResult)

  // Note: 把Response[Either[ResponseError[io.circe.Error], NasaData]] 转为了 Either[Throwable, NasaData]
  val respOfEither2Either: Response[Either[ResponseError[io.circe.Error], NasaData]] => Either[Throwable, NasaData] = resp => {
    println(s"resp==>  code:${resp.code.code}  ${resp.statusText}")
    val newEither: Either[Throwable, NasaData] = resp.body match {
      case Left(respErr) => Left(respErr.getCause)
      case Right(nasaData) => Right(nasaData)
    }
    newEither
  }

  val futureOfListOfEither: Future[List[Either[Throwable, NasaData]]] = futureOfList.map(listOfResp => {
    listOfResp.map(respOfEither2Either)
  })

  val listOfEither2TupleOfList: List[Either[Throwable, NasaData]] => (List[Throwable], List[NasaData]) = listE => {
    listE.foldLeft(List[Throwable](), List[NasaData]())((acc, eData) => {
      eData match {
        case Right(nasaData) => (acc._1, acc._2 :+ nasaData)
        case Left(t) => (acc._1 :+ t, acc._2)
      }
    })
  }

  val futureOfTupleOfList = futureOfListOfEither map listOfEither2TupleOfList

  val result = Await.result(futureOfTupleOfList, 1.minute)
  println(s"Exceptions in Response: $result._1")
  println(s"Data in Response: $result._2")
  

另,Akka HTTP默认主机连接数是32个,所以需要修改applicaiton.confg配置以支持更多连接数。

代码语言:javascript
复制
akka.http.host-connection-pool.max-open-requests = 128
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 响应式编程 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档