Akka(35): Http:Server side streaming

   在前面几篇讨论里我们都提到过:Akka-http是一项系统集成工具库。它是以数据交换的形式进行系统集成的。所以,Akka-http的核心功能应该是数据交换的实现了:应该能通过某种公开的数据格式和传输标准比较方便的实现包括异类系统之间通过网上进行的数据交换。覆盖包括:数据编码、发送和数据接收、解析全过程。Akka-http提供了许多网上传输标准数据的概括模型以及数据类型转换方法,可以使编程人员很方便的构建网上往来的Request和Response。但是,现实中的数据交换远远不止针对request和response操作能够满足的。系统之间数据交换经常涉及文件或者数据库表类型的数据上传下载。虽然在Http标准中描述了如何通过MultiPart消息类型进行批量数据的传输,但是这个标准涉及的实现细节包括数据内容描述、数据分段方式、消息数据长度计算等等简直可以立即令人却步。Akka-http是基于Akka-stream开发的:不但它的工作流程可以用Akka-stream来表达,它还支持stream化的数据传输。我们知道:Akka-stream提供了功能强大的FileIO和Data-Streaming,可以用Stream-Source代表文件或数据库数据源。简单来说:Akka-http的消息数据内容HttpEntity可以支持理论上无限长度的data-stream。最可贵的是:这个Source是个Reactive-Stream-Source,具备了back-pressure机制,可以有效应付数据交换参与两方Reactive端点不同的数据传输速率。

  Akka-http的stream类型数据内容是以Source[T,_]类型表示的。首先,Akka-stream通过FileIO对象提供了足够多的file-io操作函数,其中有个fromPath函数可以用某个文件内容数据构建一个Source类型:

/**
   * Creates a Source from a files contents.
   * Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements,
   * except the final element, which will be up to `chunkSize` in size.
   *
   * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
   * set it for a given Source by using [[akka.stream.ActorAttributes]].
   *
   * It materializes a [[Future]] of [[IOResult]] containing the number of bytes read from the source file upon completion,
   * and a possible exception if IO operation was not completed successfully.
   *
   * @param f         the file path to read from
   * @param chunkSize the size of each read operation, defaults to 8192
   */
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)

这个函数构建了Source[ByteString,Future[IOResult]],我们需要把ByteString转化成MessageEntity。首先需要在implicit-scope内提供Marshaller[ByteString,MessageEntity]类型的隐式实例:

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._
...

我们还需要Json-Streaming支持:

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)

FileIO是blocking操作,我们还可以选用独立的线程供blocking操作使用:

   FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))

现在我们可以从在server上用一个文件构建Source然后再转成Response:

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } 
    }
  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }

同样,我们也可以把数据库表内数据转成Akka-Stream-Source,然后再实现到MessageEntity的转换。转换过程包括用Query读取数据库表内数据后转成Reactive-Publisher,然后把publisher转成Akka-Stream-Source,如下:

object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}

然后进行到MessageEntity的转换:

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

下面是本次示范的完整源代码:

import java.nio.file._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.common._
import de.heikoseeberger.akkahttpjson4s.Json4sSupport
import org.json4s.jackson


object SlickDAO {
  import slick.jdbc.H2Profile.api._
  val dbConfig: slick.basic.DatabaseConfig[slick.jdbc.H2Profile] = slick.basic.DatabaseConfig.forConfig("slick.h2")
  val db = dbConfig.db

  case class CountyModel(id: Int, name: String)
  case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
    def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
    def name = column[String]("NAME",O.Length(64))
    def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
  }
  val CountyQuery = TableQuery[CountyTable]

  def loadTable(filter: String) = {
    //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
    val publisher = db.stream(qry.result)
    Source.fromPublisher(publisher = publisher)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
  }
}

trait JsonCodec extends Json4sSupport {
  import org.json4s.DefaultFormats
  import org.json4s.ext.JodaTimeSerializers
  implicit val serilizer = jackson.Serialization
  implicit val formats = DefaultFormats ++ JodaTimeSerializers.all
}
object JsConverters extends JsonCodec

object ServerStreaming extends App {
  import JsConverters._

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher

  implicit val jsonStreamingSupport = EntityStreamingSupport.json()
      .withParallelMarshalling(parallelism = 8, unordered = false)



  val (port, host) = (8011,"localhost")

  val route =
    get {
      path("files"/Remaining) { name =>
          complete(loadFile(name))
      } ~
      path("tables"/Segment) { t =>
        complete(SlickDAO.loadTable(t))
      }
    }

  def loadFile(path: String) = {
 //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
    val file = Paths.get("/Users/tiger/"+path)
    FileIO.fromPath(file, 256)
      .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher"))
      .map(_.utf8String)
  }

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()

  bindingFuture.flatMap(_.unbind())
    .onComplete(_ => httpSys.terminate())

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java编程技术

MyBatis中使用流式查询避免数据量过大导致OOM

其中fetchSize="-2147483648",Integer.MIN_VALUE=-2147483648

951
来自专栏xingoo, 一个梦想做发明家的程序员

【设计模式】—— 装饰模式Decorator

  模式意图   在不改变原来类的情况下,进行扩展。   动态的给对象增加一个业务功能,就功能来说,比生成子类更方便。   应用场景   1 在不生成子类的情...

1767
来自专栏函数式编程语言及工具

FunDA(17)- 示范:异常处理与事后处理 - Exceptions handling and Finalizers

    作为一个能安全运行的工具库,为了保证占用资源的安全性,对异常处理(exception handling)和事后处理(final clean-up)的支持...

1747
来自专栏菩提树下的杨过

dubbox 增加google-gprc/protobuf支持

好久没写东西了,今年实在太忙,基本都在搞业务开发,晚上来补一篇,作为今年的收官博客。google-rpc 正式发布以来,受到了不少人的关注,这么知名的rpc框架...

4648
来自专栏扎心了老铁

spark-streaming集成Kafka处理实时数据

在这篇文章里,我们模拟了一个场景,实时分析订单数据,统计实时收益。 场景模拟 我试图覆盖工程上最为常用的一个场景: 1)首先,向Kafka里实时的写入订单数据,...

5445
来自专栏大魏分享(微信公众号:david-share)

应用对持久数据的管理 | 从开发角度看应用架构7

当应用程序将数据存储在永久性存储中(例如flat file,XML文件或数据库的持久性数据)时,它被称为数据的持久性。 关系数据库是企业应用程序用来保存数据以供...

934
来自专栏后端之路

dubbo缓存代码分析

dubbo是Ali出品的soa框架,属于互联网企业常见的rpc选择框架。 前几篇分析了多级缓存的相关代码,本篇就dubbo的缓存进行梳理。 dubbo的缓存针...

2387
来自专栏码匠的流水账

easy-rules小试牛刀

easy-rules-core-3.1.0-sources.jar!/org/jeasy/rules/api/Rule.java

481
来自专栏JavaQ

Hystrix实现分布式系统中的故障容错

Hystrix是什么 分布式服务系统通常会通过HTTP或RPC方式调用所依赖的服务,例如支付服务通过HTTP或RPC调用银行卡服务。在高并发请求的情景下,依赖的...

3415
来自专栏wannshan(javaer,RPC)

dubbo负载均衡代码分析1(leastactive策略)

接上篇 https://cloud.tencent.com/developer/article/1109584 既然有集群容错,自然会有负载均衡。dubbo通过...

3656

扫码关注云+社区