专栏首页函数式编程语言及工具restapi(1)- 文件上传下载服务

restapi(1)- 文件上传下载服务

上次对restapi开了个头,设计了一个包括了身份验证和使用权限的restful服务开发框架。这是一个通用框架,开发人员只要直接往里面加新功能就行了。虽然这次的restapi是围绕着数据库表的CRUD操作设计的,但文件类数据在服务端与客户端之间的交换其实也很常用,特别是多媒体类如图片等文件类型。那我们就试着设计一个文件交换服务功能然后看看能不能很方便的加入到restapi框架内。

akka-http是以akka-stream为核心的,使用了大量的akka-stream功能。akka-stream中有一个FileIO组件库,里面提供了一系列有关文件读写功能,以数据流Source,Sink方式呈现:

...
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] =
    fromPath(f, chunkSize, startPosition = 0)
...
  def toPath(
      f: Path,
      options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] =
    toPath(f, options, startPosition = 0)

可以发现,这些Source,Sink都是以ByteString为流元素进行操作的,akka-http自带了ByteString的Marshaller,可以实现数据格式自动转换,在网络传输中不需要增加什么数据格式转换动作。先用FileIO来产生一个Source[ByteString,_]:

package com.datatech.restapi
import akka.stream._
import akka.stream.scaladsl._
import java.nio.file._
import akka.util._
object FileStreaming {

  def fileStreamSource(filePath: String, chunkSize: Int = 1024, dispatcherName: String = ""): Source[ ByteString,Any] = {
    def loadFile  = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      if (dispatcherName != "")
        FileIO.fromPath(file, chunkSize)
         .withAttributes(ActorAttributes.dispatcher(dispatcherName))
      else
        FileIO.fromPath(file, chunkSize)
    }
    loadFile
  }
}

注意,我们可以用akka系统之外的线程池来进行FileIO操作,可以避免影响akka系统的运行效率。dispatcherName标注了在application.conf里自定义的线程池:

akka {
  http {
    blocking-ops-dispatcher {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        // or in Akka 2.4.2+
        fixed-pool-size = 16
      }
      throughput = 100
    }
  }
}

下面是File功能架构FileRoute的设计:

package com.datatech.restapi
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.coding.Gzip
import java.nio.file._
import FileStreaming._
import AuthBase._

case class FileRoute(jwt: String)(implicit auth: AuthBase, sys: ActorSystem) {

  val destPath = "/users/tiger-macpro/cert4/meme.jpg"
  implicit val mat = ActorMaterializer()
  val route = pathPrefix("file") {
    val privatePath = auth.tempDirFromJwt(jwt)
    if (privatePath.length == 0)
      complete(StatusCodes.NotFound)

    (get & path(Remaining)) { filename =>
      withoutSizeLimit {
        encodeResponseWith(Gzip) {
          complete(
            HttpEntity(
              ContentTypes.`application/octet-stream`,
              fileStreamSource(privatePath+"/download/"+filename, 1024))
          )
        }
      }
    } ~
      (post &  parameters('filename)) { filename =>
        withoutSizeLimit {
          decodeRequest {
            extractDataBytes { bytes =>
              val fut = bytes.runWith(FileIO.toPath(Paths.get(privatePath+"/upload/"+filename)))
              onComplete(fut) { _ => complete(StatusCodes.OK)}
            }
          }
        }

      }

  }
}

每个用户在服务端都应该有个独立的文件目录,这个刚好可以放在jwt里:

package com.datatech.restapi
import akka.http.scaladsl.server.directives.Credentials
import AuthBase._
object MockUserAuthService {

  case class User(username: String, password: String, userInfo: UserInfo)
  val validUsers = Seq(User("johnny", "p4ssw0rd",
       Map("shopid" -> "1101", "userid" -> "101", "tmpdir" ->"/users/tiger-macpro/1101101"))
    ,User("tiger", "secret",
      Map("shopid" -> "1101" , "userid" -> "102", "tmpdir" ->"/users/tiger-macpro/1101102")))

  def getValidUser(credentials: Credentials): Option[UserInfo] =
    credentials match {
      case p @ Credentials.Provided(_) =>
        validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match {
          case Some(user) => Some(user.userInfo)
          case _ => None
        }
      case _ => None
    }

}

个人目录tmpdir是放在UserInfo里的,我们只需要从jwt里解析分离出来:

   def tempDirFromJwt(jwt: String): String = {
      val optUserInfo = getUserInfo(jwt)
      val dir: String = optUserInfo match {
        case Some(m) =>
          try {
            m("tmpdir").toString
          } catch {case err: Throwable => ""}
        case None => ""
      }
      dir
    }

文件交换服务是需要使用权限的,所以FileRoute要放在authenticateOAuth2下面:

 val route =
     path("auth") {
        authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
          post { complete(authenticator.issueJwt(userinfo))}
        }
     } ~
       pathPrefix("api") {
          authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
            (path("hello") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
            (path("how are you") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
              FileRoute(validToken)
                .route
            // ~ ...
          }
     }

写一个客户端来测试文件交换服务:

import akka.stream._
import java.nio.file._
import java.io._
import akka.http.scaladsl.model.headers._
import scala.concurrent._
import com.datatech.restapi.FileStreaming._
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{FileIO, Source}
import scala.util._


case class FileUtil(implicit sys: ActorSystem) {
  import sys.dispatcher
  implicit val mat = ActorMaterializer()
  def createEntity(file: File): RequestEntity = {
    require(file.exists())
    val formData =
      Multipart.FormData(
        Source.single(
          Multipart.FormData.BodyPart(
            "test",
            HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
            Map("filename" -> file.getName))))
    Await.result(Marshal(formData).to[RequestEntity], 3 seconds)
  }

  def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
    implicit val mat = ActorMaterializer()
    import sys.dispatcher
    val futResp = Http(sys).singleRequest(
   //   Gzip.encodeMessage(
        request.copy(entity = dataEntity)   //.addHeader(`Content-Encoding`(HttpEncodings.gzip))
   //   )
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")
      }
  }

  def downloadFileTo(request: HttpRequest, destPath: String) = {
  //  val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))
    val futResp = Http(sys).singleRequest(request)  //.map(Gzip.decodeMessage(_))
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download file!")
        case Failure(err) => println(s"Download failed: ${err.getMessage}")
      }

  }

}

object TestFileClient  {
  type UserInfo = Map[String,Any]
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/")

    val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
    val authRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/auth",
      headers = List(authorization)
    )


    val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)

    val respToken = for {
      resp <- futToken
      jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
    } yield jstr

    val jstr =  Await.result[String](respToken,2 seconds)
    println(jstr)

    scala.io.StdIn.readLine()

    val authentication = headers.Authorization(OAuth2BearerToken(jstr))

    val entity = HttpEntity(
      ContentTypes.`application/octet-stream`,
      fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024)
    )
    //
    val chunked = HttpEntity.Chunked.fromData(
      ContentTypes.`application/octet-stream`,
      fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024)
    )

    val multipart = FileUtil().createEntity(new File("/Users/tiger-macpro/cert3/ctiger.jpg"))

    val uploadRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/api/file?filename=mypic.jpg",
    ).addHeader(authentication)

    //upload file
    //Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds)
    //Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)
    Await.ready(FileUtil().uploadFile(uploadRequest,multipart),2 seconds)

    val dlRequest = HttpRequest(
      HttpMethods.GET,
      uri = "http://192.168.11.189:50081/api/file/mypic.jpg",
    ).addHeader(authentication)

    FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg")

    scala.io.StdIn.readLine()
    system.terminate()
  }

}

在文件上传upload时试过用entity,chunked,multipart方式构建的request-entity,服务端都能处理。好像看过很多java的httpclient图片上传,都是用multipart entity。现在这个服务端能正确处理。当然,在服务端同样可以用multipart方式提供文件下载服务,就不在这里实现了。不过可以提供一段示范代码:

import akka.actor._
import akka.stream._
import java.nio.file._
import java.io._

import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.util.ByteString

import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{FileIO, Source}
import com.typesafe.config.{Config, ConfigFactory}

import scala.concurrent.Future

object TestMultipartFileUpload extends App {
  val testConf: Config = ConfigFactory.parseString("""
    akka.loglevel = INFO
    akka.log-dead-letters = off""")
  implicit val system = ActorSystem("ServerTest", testConf)
  import system.dispatcher
  implicit val materializer = ActorMaterializer()

  val testFile: File = new File("/users/tiger-macpro/downloads/uploadFileDemo.scala")  //args(0))

  def startTestServer(): Future[ServerBinding] = {
    import akka.http.scaladsl.server.Directives._

    val route: Route =
      path("upload") {
        entity(as[Multipart.FormData]) { (formdata: Multipart.FormData) ⇒

          val fileNamesFuture = formdata.parts.mapAsync(1) { p ⇒
            println(s"Got part. name: ${p.name} filename: ${p.filename}")

            println("Counting size...")
            @volatile var lastReport = System.currentTimeMillis()
            @volatile var lastSize = 0L
            def receiveChunk(counter: (Long, Long), chunk: ByteString): (Long, Long) = {
              val (oldSize, oldChunks) = counter
              val newSize = oldSize + chunk.size
              val newChunks = oldChunks + 1

              val now = System.currentTimeMillis()
              if (now > lastReport + 1000) {
                val lastedTotal = now - lastReport
                val bytesSinceLast = newSize - lastSize
                val speedMBPS = bytesSinceLast.toDouble / 1000000 /* bytes per MB */ / lastedTotal * 1000 /* millis per second */

                println(f"Already got $newChunks%7d chunks with total size $newSize%11d bytes avg chunksize ${newSize / newChunks}%7d bytes/chunk speed: $speedMBPS%6.2f MB/s")

                lastReport = now
                lastSize = newSize
              }
              (newSize, newChunks)
            }

            p.entity.dataBytes.runFold((0L, 0L))(receiveChunk).map {
              case (size, numChunks) ⇒
                println(s"Size is $size")
                (p.name, p.filename, size)
            }
          }.runFold(Seq.empty[(String, Option[String], Long)])(_ :+ _).map(_.mkString(", "))

          complete {
            fileNamesFuture
          }
        }
      }
    Http().bindAndHandle(route, interface = "localhost", port = 0)
  }

  def createEntity(file: File): Future[RequestEntity] = {
    require(file.exists())
    val formData =
      Multipart.FormData(
        Source.single(
          Multipart.FormData.BodyPart(
            "test",
            HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
            Map("filename" -> file.getName))))
    Marshal(formData).to[RequestEntity]
  }

  def createRequest(target: Uri, file: File): Future[HttpRequest] =
    for {
      e ← createEntity(file)
    } yield HttpRequest(HttpMethods.POST, uri = target, entity = e)

  try {
    val result =
      for {
        ServerBinding(address) ← startTestServer()
        _ = println(s"Server up at $address")
        port = address.getPort
        target = Uri(scheme = "http", authority = Uri.Authority(Uri.Host("localhost"), port = port), path = Uri.Path("/upload"))
        req ← createRequest(target, testFile)
        _ = println(s"Running request, uploading test file of size ${testFile.length} bytes")
        response ← Http().singleRequest(req)
        responseBodyAsString ← Unmarshal(response).to[String]
      } yield responseBodyAsString

    result.onComplete { res ⇒
      println(s"The result was $res")
      system.terminate()
    }

    system.scheduler.scheduleOnce(60.seconds) {
      println("Shutting down after timeout...")
      system.terminate()
    }
  } catch {
    case _: Throwable ⇒ system.terminate()
  }
}

上面这个示范里包括了服务端,客户端对multipart的数据处理。

在上面这个例子里我们先设计了一个独立的包括文件交换服务功能的FileRoute类,然后直接把FileRoute.route贴在主菜单后面就完成了文件交换服务功能的添加。比较接近实现restapi设计初衷。

下面是本次示范源代码:

build.sbt

name := "restapi"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-http"   % "10.1.8",
  "com.typesafe.akka" %% "akka-stream" % "2.5.23",
  "com.pauldijou" %% "jwt-core" % "3.0.1",
  "de.heikoseeberger" %% "akka-http-json4s" % "1.22.0",
  "org.json4s" %% "json4s-native" % "3.6.1",
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
  "org.slf4j" % "slf4j-simple" % "1.7.25",
  "org.json4s" %% "json4s-jackson" % "3.6.7",
  "org.json4s" %% "json4s-ext" % "3.6.7"
)

auth/AuthBase.scala

package com.datatech.restapi

import akka.http.scaladsl.server.directives.Credentials
import pdi.jwt._
import org.json4s.native.Json
import org.json4s._
import org.json4s.jackson.JsonMethods._
import pdi.jwt.algorithms._
import scala.util._

object AuthBase {
  type UserInfo = Map[String, Any]
  case class AuthBase(
                       algorithm: JwtAlgorithm = JwtAlgorithm.HMD5,
                       secret: String = "OpenSesame",
                       getUserInfo: Credentials => Option[UserInfo] = null) {
    ctx =>
    def withAlgorithm(algo: JwtAlgorithm): AuthBase = ctx.copy(algorithm=algo)
    def withSecretKey(key: String): AuthBase = ctx.copy(secret = key)
    def withUserFunc(f: Credentials => Option[UserInfo]): AuthBase = ctx.copy(getUserInfo = f)

    def authenticateToken(credentials: Credentials): Option[String] =
      credentials match {
        case Credentials.Provided(token) =>
          algorithm match {
            case algo: JwtAsymmetricAlgorithm =>
              Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtAsymmetricAlgorithm]))) match {
                case true => Some(token)
                case _ => None
              }
            case _ =>
              Jwt.isValid(token, secret, Seq((algorithm.asInstanceOf[JwtHmacAlgorithm]))) match {
                case true => Some(token)
                case _ => None
              }
          }
        case _ => None
      }

    def getUserInfo(token: String): Option[UserInfo] = {
      algorithm match {
        case algo: JwtAsymmetricAlgorithm =>
          Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtAsymmetricAlgorithm])) match {
            case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
            case Failure(err) => None
          }
        case _ =>
          Jwt.decodeRawAll(token, secret, Seq(algorithm.asInstanceOf[JwtHmacAlgorithm])) match {
            case Success(parts) => Some(((parse(parts._2).asInstanceOf[JObject]) \ "userinfo").values.asInstanceOf[UserInfo])
            case Failure(err) => None
          }
      }
    }

    def issueJwt(userinfo: UserInfo): String = {
      val claims = JwtClaim() + Json(DefaultFormats).write(("userinfo", userinfo))
      Jwt.encode(claims, secret, algorithm)
    }

    def tempDirFromJwt(jwt: String): String = {
      val optUserInfo = getUserInfo(jwt)
      val dir: String = optUserInfo match {
        case Some(m) =>
          try {
            m("tmpdir").toString
          } catch {case err: Throwable => ""}
        case None => ""
      }
      dir
    }

  }

}

file/FileStreaming.scala

package com.datatech.restapi
import akka.stream._
import akka.stream.scaladsl._
import java.nio.file._
import akka.util._
object FileStreaming {

  def fileStreamSource(filePath: String, chunkSize: Int = 1024, dispatcherName: String = ""): Source[ ByteString,Any] = {
    def loadFile  = {
      //   implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher")
      val file = Paths.get(filePath)
      if (dispatcherName != "")
        FileIO.fromPath(file, chunkSize)
         .withAttributes(ActorAttributes.dispatcher(dispatcherName))
      else
        FileIO.fromPath(file, chunkSize)
    }
    loadFile
  }
}

file/FileRoute.scala

package com.datatech.restapi
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model._
import akka.http.scaladsl.coding.Gzip
import java.nio.file._
import FileStreaming._
import AuthBase._

case class FileRoute(jwt: String)(implicit auth: AuthBase, sys: ActorSystem) {

  val destPath = "/users/tiger-macpro/cert4/meme.jpg"
  implicit val mat = ActorMaterializer()
  val route = pathPrefix("file") {
    val privatePath = auth.tempDirFromJwt(jwt)
    if (privatePath.length == 0)
      complete(StatusCodes.NotFound)

    (get & path(Remaining)) { filename =>
      withoutSizeLimit {
        encodeResponseWith(Gzip) {
          complete(
            HttpEntity(
              ContentTypes.`application/octet-stream`,
              fileStreamSource(privatePath+"/download/"+filename, 1024))
          )
        }
      }
    } ~
      (post &  parameters('filename)) { filename =>
        withoutSizeLimit {
          decodeRequest {
            extractDataBytes { bytes =>
              val fut = bytes.runWith(FileIO.toPath(Paths.get(privatePath+"/upload/"+filename)))
              onComplete(fut) { _ => complete(StatusCodes.OK)}
            }
          }
        }

      }

  }
}

MockUserAuthService.scala

package com.datatech.restapi
import akka.http.scaladsl.server.directives.Credentials
import AuthBase._
object MockUserAuthService {

  case class User(username: String, password: String, userInfo: UserInfo)
  val validUsers = Seq(User("johnny", "p4ssw0rd",
       Map("shopid" -> "1101", "userid" -> "101", "tmpdir" ->"/users/tiger-macpro/1101101"))
    ,User("tiger", "secret",
      Map("shopid" -> "1101" , "userid" -> "102", "tmpdir" ->"/users/tiger-macpro/1101102")))

  def getValidUser(credentials: Credentials): Option[UserInfo] =
    credentials match {
      case p @ Credentials.Provided(_) =>
        validUsers.find(user => user.username == p.identifier && p.verify(user.password)) match {
          case Some(user) => Some(user.userInfo)
          case _ => None
        }
      case _ => None
    }

}

RestApiServer.scala

package com.datatech.restapi

import akka.actor._
import akka.stream._
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import pdi.jwt._
import AuthBase._
import MockUserAuthService._

object RestApiServer extends App {

  implicit val httpSys = ActorSystem("httpSystem")
  implicit val httpMat = ActorMaterializer()
  implicit val httpEC = httpSys.dispatcher



  implicit val authenticator = new AuthBase()
    .withAlgorithm(JwtAlgorithm.HS256)
    .withSecretKey("OpenSesame")
    .withUserFunc(getValidUser)

  val route =
     path("auth") {
        authenticateBasic(realm = "auth", authenticator.getUserInfo) { userinfo =>
          post { complete(authenticator.issueJwt(userinfo))}
        }
     } ~
       pathPrefix("api") {
          authenticateOAuth2(realm = "api", authenticator.authenticateToken) { validToken =>
            (path("hello") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
            (path("how are you") & get) {
              complete(s"Hello! userinfo = ${authenticator.getUserInfo(validToken)}")
            } ~
              FileRoute(validToken)
                .route
            // ~ ...
          }
     }


  val (port, host) = (50081,"192.168.11.189")

  val bindingFuture = Http().bindAndHandle(route,host,port)

  println(s"Server running at $host $port. Press any key to exit ...")

  scala.io.StdIn.readLine()


  bindingFuture.flatMap(_.unbind())
  .onComplete(_ => httpSys.terminate())


}

TestFileClient.scala

import akka.stream._
import java.nio.file._
import java.io._
import akka.http.scaladsl.model.headers._
import scala.concurrent._
import com.datatech.restapi.FileStreaming._
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.Http
import akka.stream.scaladsl.{FileIO, Source}
import scala.util._


case class FileUtil(implicit sys: ActorSystem) {
  import sys.dispatcher
  implicit val mat = ActorMaterializer()
  def createEntity(file: File): RequestEntity = {
    require(file.exists())
    val formData =
      Multipart.FormData(
        Source.single(
          Multipart.FormData.BodyPart(
            "test",
            HttpEntity(MediaTypes.`application/octet-stream`, file.length(), FileIO.fromPath(file.toPath, chunkSize = 100000)), // the chunk size here is currently critical for performance
            Map("filename" -> file.getName))))
    Await.result(Marshal(formData).to[RequestEntity], 3 seconds)
  }

  def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = {
    implicit val mat = ActorMaterializer()
    import sys.dispatcher
    val futResp = Http(sys).singleRequest(
   //   Gzip.encodeMessage(
        request.copy(entity = dataEntity)   //.addHeader(`Content-Encoding`(HttpEncodings.gzip))
   //   )
    )
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.dataBytes.map(_.utf8String).runForeach(println)
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Upload request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to Upload file!")
        case Failure(err) => println(s"Upload failed: ${err.getMessage}")
      }
  }

  def downloadFileTo(request: HttpRequest, destPath: String) = {
  //  val req = request.addHeader(`Content-Encoding`(HttpEncodings.gzip))
    val futResp = Http(sys).singleRequest(request)  //.map(Gzip.decodeMessage(_))
    futResp
      .andThen {
        case Success(r@HttpResponse(StatusCodes.OK, _, entity, _)) =>
          entity.withoutSizeLimit().dataBytes.runWith(FileIO.toPath(Paths.get(destPath)))
            .onComplete { case _ => println(s"Download file saved to: $destPath") }
        case Success(r@HttpResponse(code, _, _, _)) =>
          println(s"Download request failed, response code: $code")
          r.discardEntityBytes()
        case Success(_) => println("Unable to download file!")
        case Failure(err) => println(s"Download failed: ${err.getMessage}")
      }

  }

}

object TestFileClient  {
  type UserInfo = Map[String,Any]
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    // needed for the future flatMap/onComplete in the end
    implicit val executionContext = system.dispatcher

    val helloRequest = HttpRequest(uri = "http://192.168.11.189:50081/")

    val authorization = headers.Authorization(BasicHttpCredentials("johnny", "p4ssw0rd"))
    val authRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/auth",
      headers = List(authorization)
    )


    val futToken: Future[HttpResponse] = Http().singleRequest(authRequest)

    val respToken = for {
      resp <- futToken
      jstr <- resp.entity.dataBytes.runFold("") {(s,b) => s + b.utf8String}
    } yield jstr

    val jstr =  Await.result[String](respToken,2 seconds)
    println(jstr)

    scala.io.StdIn.readLine()

    val authentication = headers.Authorization(OAuth2BearerToken(jstr))

    val entity = HttpEntity(
      ContentTypes.`application/octet-stream`,
      fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024)
    )
    //
    val chunked = HttpEntity.Chunked.fromData(
      ContentTypes.`application/octet-stream`,
      fileStreamSource("/Users/tiger-macpro/cert3/ctiger.jpg",1024)
    )

    val multipart = FileUtil().createEntity(new File("/Users/tiger-macpro/cert3/ctiger.jpg"))

    val uploadRequest = HttpRequest(
      HttpMethods.POST,
      uri = "http://192.168.11.189:50081/api/file?filename=mypic.jpg",
    ).addHeader(authentication)

    //upload file
    //Await.ready(FileUtil().uploadFile(uploadRequest,entity),2 seconds)
    //Await.ready(FileUtil().uploadFile(uploadRequest,chunked),2 seconds)
    Await.ready(FileUtil().uploadFile(uploadRequest,multipart),2 seconds)

    val dlRequest = HttpRequest(
      HttpMethods.GET,
      uri = "http://192.168.11.189:50081/api/file/mypic.jpg",
    ).addHeader(authentication)

    FileUtil().downloadFileTo(dlRequest, "/users/tiger-macpro/cert3/mypic.jpg")

    scala.io.StdIn.readLine()
    system.terminate()
  }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Akka-CQRS(8)- CQRS Reader Actor 应用实例

    前面我们已经讨论了CQRS-Reader-Actor的基本工作原理,现在是时候在之前那个POS例子里进行实际的应用示范了。

    用户1150956
  • restapi(3)- MongoDBEngine : MongoDB Scala编程工具库

    最近刚好有同事在学习MongoDB,我们讨论过MongoDB应该置于服务器端然后通过web-service为客户端提供数据的上传下载服务。我们可以用上节讨论的r...

    用户1150956
  • restapi(0)- 平台数据维护,写在前面

    在云计算的推动下,软件系统发展趋于平台化。云平台系统一般都是分布式的集群系统,采用大数据技术。在这方面akka提供了比较完整的开发技术支持。我在上一个系列...

    用户1150956
  • restapi(0)- 平台数据维护,写在前面

    在云计算的推动下,软件系统发展趋于平台化。云平台系统一般都是分布式的集群系统,采用大数据技术。在这方面akka提供了比较完整的开发技术支持。我在上一个系列...

    用户1150956
  • Akka(41): Http:DBTable-rows streaming - 数据库表行交换

      在前面一篇讨论里我们介绍了通过http进行文件的交换。因为文件内容是以一堆bytes来表示的,而http消息的数据部分也是byte类型的,所以我们可以直接用...

    用户1150956
  • Android实现屏幕录制功能

    本文实例为大家分享了Android实现屏幕录制功能的具体代码,供大家参考,具体内容如下

    砸漏
  • restapi(2)- generic restful CRUD:通用的restful风格数据库表维护工具

    研究关于restapi的初衷是想搞一套通用的平台数据表维护http工具。前面谈过身份验证和使用权限、文件的上传下载,这次来到具体的数据库表维护。我们在这篇...

    用户1150956
  • 跟我学Kafka源码Producer分析

    我的原文博客地址是:http://flychao88.iteye.com/blog/2266611

    小程故事多
  • 基于Scala Acotor实现多线程单词统计(WordCount)

    天策
  • Android layer实现圆角 阴影

    在drawable的那个layer布局xml中采用item的方式将各个图形控制分开,依照层级堆叠

    大话swift

扫码关注云+社区

领取腾讯云代金券