首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使用Akka Stream和Akk-Http来流式传输响应

如何使用Akka Stream和Akk-Http来流式传输响应
EN

Stack Overflow用户
提问于 2019-03-17 20:07:12
回答 1查看 115关注 0票数 0

我是新来Akka Stream的我使用以下代码进行CSV解析。

代码语言:javascript
运行
复制
class CsvParser(config: Config)(implicit system: ActorSystem) extends LazyLogging with NumberValidation {

  import system.dispatcher

  private val importDirectory = Paths.get(config.getString("importer.import-directory")).toFile
  private val linesToSkip = config.getInt("importer.lines-to-skip")
  private val concurrentFiles = config.getInt("importer.concurrent-files")
  private val concurrentWrites = config.getInt("importer.concurrent-writes")
  private val nonIOParallelism = config.getInt("importer.non-io-parallelism")

  def save(r: ValidReading): Future[Unit] = {
      Future()
  }

  def parseLine(filePath: String)(line: String): Future[Reading] = Future {
    val fields = line.split(";")
    val id = fields(0).toInt
    try {
      val value = fields(1).toDouble
      ValidReading(id, value)
    } catch {
      case t: Throwable =>
        logger.error(s"Unable to parse line in $filePath:\n$line: ${t.getMessage}")
        InvalidReading(id)
    }
  }

  val lineDelimiter: Flow[ByteString, ByteString, NotUsed] =
    Framing.delimiter(ByteString("\n"), 128, allowTruncation = true)

  val parseFile: Flow[File, Reading, NotUsed] =
    Flow[File].flatMapConcat { file =>
      val src = FileSource.fromFile(file).getLines()
      val source : Source[String, NotUsed] = Source.fromIterator(() => src)
      // val gzipInputStream = new GZIPInputStream(new FileInputStream(file))

      source
        .mapAsync(parallelism = nonIOParallelism)(parseLine(file.getPath))
    }

  val computeAverage: Flow[Reading, ValidReading, NotUsed] =
    Flow[Reading].grouped(2).mapAsyncUnordered(parallelism = nonIOParallelism) { readings =>
      Future {
        val validReadings = readings.collect { case r: ValidReading => r }
        val average = if (validReadings.nonEmpty) validReadings.map(_.value).sum / validReadings.size else -1
        ValidReading(readings.head.id, average)
      }
    }

  val storeReadings: Sink[ValidReading, Future[Done]] =
    Flow[ValidReading]
      .mapAsyncUnordered(concurrentWrites)(save)
      .toMat(Sink.ignore)(Keep.right)

  val processSingleFile: Flow[File, ValidReading, NotUsed] =
    Flow[File]
      .via(parseFile)
      .via(computeAverage)

  def importFromFiles = {
    implicit val materializer = ActorMaterializer()

    val files = importDirectory.listFiles.toList
    logger.info(s"Starting import of ${files.size} files from ${importDirectory.getPath}")

    val startTime = System.currentTimeMillis()

    val balancer = GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val balance = builder.add(Balance[File](concurrentFiles))
      val merge = builder.add(Merge[ValidReading](concurrentFiles))

      (1 to concurrentFiles).foreach { _ =>
        balance ~> processSingleFile ~> merge
      }

      FlowShape(balance.in, merge.out)
    }

    Source(files)
      .via(balancer)
      .withAttributes(ActorAttributes.supervisionStrategy { e =>
        logger.error("Exception thrown during stream processing", e)
        Supervision.Resume
      })
      .runWith(storeReadings)
      .andThen {
        case Success(_) =>
          val elapsedTime = (System.currentTimeMillis() - startTime) / 1000.0
          logger.info(s"Import finished in ${elapsedTime}s")
        case Failure(e) => logger.error("Import failed", e)
      }
  }
}

我想使用Akka HTTP,它将提供从CSV解析的所有ValidReading实体,但我不明白该如何做到这一点。

上面的代码从服务器获取文件,并解析每一行以生成ValidReading

如何通过akka-http传递/上传CSV,解析文件并将结果响应流式传输回端点?

EN

回答 1

Stack Overflow用户

发布于 2019-03-18 00:48:22

解决方案的“本质”是这样的:

代码语言:javascript
运行
复制
import akka.http.scaladsl.server.Directives._
val route = fileUpload("csv") {
  case (metadata, byteSource) =>
    val source = byteSource.map(x => x)
    complete(HttpResponse(entity = HttpEntity(ContentTypes.`text/csv(UTF-8)`, source)))
}

您检测到上传的内容是一个多部分表单数据,其中包含一个名为"csv“的块。你可以从中得到byteSource。执行计算(将您的逻辑插入到.map(x=>x)部件)。将您的数据转换回ByteString。使用新的源完成请求。这将使你的endoint像一个代理。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55206864

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档