在Akka-http里我们把需要传输的数据转换成ByteString,通过网络发送給接收端、接收端再把收到消息Entity中的ByteString转换成目标类型的数据。...) .andThen{case _ => sys.terminate()} 从显示的结果可以得出runService函数中的entity.dataBytes.map(_.utf8String)已经把...ByteString转换成了String,也就是说服务器端发送的Entity里的数据是ByteString。...import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.model._ import akka.util.ByteString...import akka.http.scaladsl.model._ import scala.concurrent.duration._ import akka.util.ByteString import
在上期讨论我们提到过这种转换其实是ROW->Json->ByteString或者反方向的转换,在Akka-http里称之为Marshalling和Unmarshalling。...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives...服务端收到数据后又要进行反向的转换即把Request.Entity.dataBytes从Source[ByteString,_]转回Source[T,_]。...如下: import akka.util.ByteString import akka.http.scaladsl.model.HttpEntity.limitableByteSource...://localhost:8011/rows")) import akka.util.ByteString import akka.http.scaladsl.model.HttpEntity.limitableByteSource
在上篇我们介绍了Akka-http Low-Level-Api。实际上这个Api提供了Server对进来的Http-requests进行处理及反应的自定义Flow或者转换函数的接入界面。...我们看看下面官方文档给出的例子: import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpMethods...._ import akka.http.scaladsl.model._ import akka.stream.ActorMaterializer import scala.io.StdIn object...Akka-http提供了一套routing DSL作为High-Level-Api的主要组成部分。...Akka-http提供了所有22个TupleXX[L]的隐形实例。
更重要的是:Akka-http还支持reactive-stream,可以避免由传输速率所产生的种种问题。在本篇我们讨论利用Akka-http进行文件的双向传递。 ...,_],直接就是流型式,应该可以直接放入Http消息的Entity中,如下: def fileStream(filePath: String, chunkSize: Int): Source[ByteString...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpEntity.limitableByteSource...import akka.http.scaladsl.model._ import java.nio.file._ import akka.util.ByteString import scala.util
akka-http提供了一套功能强大,使用又很方便的Routing DSL。...这个是通过 ~ 操作符号实现的 在Akka-http的routing DSL里这些Route组合操作是通过Directive实现的。...Akka-http提供了大量现成的Directive,我们也可以自定义一些特殊功能的Directive,详情可以查询官方文件或者api文件。...Akka-http提供了所有22个TupleXX[L]的隐形实例。...Directive1[Unit] = Directive0 Directive1,Directive0: type Directive0 = Directive[Unit] type Directive1
Akka-http是基于Akka-stream开发的:不但它的工作流程可以用Akka-stream来表达,它还支持stream化的数据传输。...Akka-http的stream类型数据内容是以Source[T,_]类型表示的。...* Emitted elements are `chunkSize` sized [[akka.util.ByteString]] elements, * except the final...")) 现在我们可以从在server上用一个文件构建Source然后再转成Response: val route = get { path("files"/Remaining)...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives
上篇我们讨论了Akka-http的文件交换。由于文件内容编码和传输线上数据表达型式皆为bytes,所以可以直接把文件内容存进HttpEntity中进行传递。...(p.toString)) } 这个Marshaller代表的转换过程是:Person -> Person.String -> ByteString。...Akka-http自带的Json解决方案用的是Spray-Json,下面我们就用Spray-Json来实现转换: import akka.http.scaladsl.marshallers.sprayjson...下面是本次讨论的示范源代码: import akka.actor._ import akka.stream.scaladsl._ import akka.http.scaladsl.marshalling...._ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.MediaTypes._ import akka.util.ByteString
,akka-http自带了ByteString的Marshaller,可以实现数据格式自动转换,在网络传输中不需要增加什么数据格式转换动作。...String, chunkSize: Int = 1024, dispatcherName: String = ""): Source[ ByteString,Any] = { def loadFile...akka.http.scaladsl.unmarshalling.Unmarshal import akka.util.ByteString import scala.concurrent.duration...import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Route import akka.http.scaladsl.Http...def fileStreamSource(filePath: String, chunkSize: Int = 1024, dispatcherName: String = ""): Source[ ByteString
我们可以通过Akka-http的raw-header来实现附加自定义消息的传递,这项功能可以通过Akka-http提供的raw-header筛选功能来实现。...import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import scala.util._...implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() import akka.util.ByteString...._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka._ import...import scala.concurrent._ import akka.http.scaladsl.server._ import akka.http.scaladsl.server.Directives
首先,用akka-http搭建一个http server框架: import akka.actor._ import akka.stream._ import akka.http.scaladsl.Http...下一步研究一下如何构建返回的HttpResponse:httpresponse是从server端传送到client端的。...这个过程包括把HttpResponse Entity里的数据从某种类型转换成通讯用的二进制数据流、到了客户端再转换成目标类型。...akka-http的数据转换机制Marshaller/Unmarshaller是通过类型转换的隐式实例来实现的,akka-http提供了多个标准类型数据转换的隐式实例,如StringMarshaller..." %% "akka-http" % "10.1.8" , "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8", "com.typesafe.akka
Akka-http提供了许多类型的预设实例到Mashalling转换: PredefinedToEntityMarshallers Array[Byte] ByteString Array[Char]...String akka.http.scaladsl.model.FormData akka.http.scaladsl.model.MessageEntity T <: akka.http.scaladsl.model.Multipart...下面是一些Marshal用例: import akka.util.ByteString import akka.http.scaladsl.model....Akka-http通过akka-http-spray-json模块直接支持由Spray-Json实现的Json读写工具库。...import akka.stream._ import akka.util.ByteString import akka.http.scaladsl.marshalling.Marshal import
[Char] String akka.http.scaladsl.model.FormData GenericUnmarshallers Unmarshaller[T, T] (identity unmarshaller...注意:这一步只包括了从网上可传输类型到程序类型转换这一过程,不包括具体实现时的Json转换。...下面是一些Unmarshal的用例: import akka.actor._ import akka.stream._ import akka.http.scaladsl.unmarshalling.Unmarshal...import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ object Unmarshalling...import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import scala.concurrent
由于jdbc数据库不支持分布式的运算模式,所以从数据交换的角度上它与集群环境是脱离的:jdbc数据不可以从集群中的任何节点获取。所以只有通过基于http的一种服务来向其它节点提供数据。...我首先考虑了akka-http,在准备过程中接触了gRPC,发现gRPC更加适合跨jvm的程序控制,主要因为gRPC支持双向的流控制。...首先示范一个传统的Unary(request/response)模式实现:从客户端向服务端发出一个Query指令、服务端按指令从JDBC数据库中返回DataRows。...scalaPB自动把bytes类型对应成ByteString如下: parameters: _root_.com.google.protobuf.ByteString = _root_.com.google.protobuf.ByteString.EMPTY...._ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.
举个形象的例子:如果实现把脏水从A点引到B点输出纯净水作为某种函数式程序,编程如同搭建管道网。...单从这个功能来讲,应该由几个环节组成: 1、从上传的数据中抽出图片下载网址 2、下载图片,通过http的request请求,从response里获取图片数据流 3、通过mongodb的count功能获取图片系列序号...import akka.actor.ActorSystem import akka.http.scaladsl.model._ import akka.http.scaladsl.Http...def downloadPicture(url: String)(implicit sys: ActorSystem): Future[ByteString] = { val dlRequest...def addPicuture(pid: String,seqno: Int, optDesc: Option[String] ,optWid:Option[Int
我们可以把集群客户端模式分成集群客户端ClusterClient和集群服务端ClusterClientReceptionist,从字面理解这就是个接待员这么个角色,负责接待集群外客户端发起的服务请求。...在具体应用中要注意sender()的具体意义:从提供服务的actor方面看,sender()代表ClusterClientReceptionist。...从发布消息的actor角度看,sender()代表的是DeadLetter。如果服务actor需要知道请求者具体地址,发布方可以把自己的地址嵌在发布的消息结构里。...val conf = ConfigFactory.load("client") val clientSystem = ActorSystem("ClientSystem",conf) /* 从...val conf = ConfigFactory.load("client") val clientSystem = ActorSystem("ClientSystem",conf) /* 从
在一个akka-cluster环境里,从数据调用的角度上,JDBC数据库与集群中其它节点是脱离的。这是因为JDBC数据库不是分布式的,不具备节点位置透明化特性。...因为我们已经明确选择了在akka-cluster集群环境里实施gRPC服务模式,通过akka-stream的流控制方式实现数据库操作的程序控制,所以在本次讨论里我们将示范说明gRPC-JDBC-Streaming...下面我们示范一下从客户端传送一个数据流(stream MemberRow),由服务端插入数据库操作。...我们从服务端读取MemberRow再传回服务端进行更新操作。...._ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.
上期说道:http/2还属于一种不算普及的技术协议,可能目前只适合用于内部系统集成,现在开始大面积介入可能为时尚早。...毕竟用akka-grpc做了些事情,想想还是再写这篇跟大家分享使用kka-grpc的过程。 我说过,了解akka-grpc的主要目的还是在protobuf的应用上。这是一种高效率的序列化协议。...akka-grpc应用一般从IDL文件里消息类型和服务函数的定义开始,如下面这个.proto文件示范: syntax = "proto3"; import "google/protobuf/wrappers.proto...= 3; } message Book { string ean = 1; string ver = 2; string isbn = 3; string title = 4; string...case ImgProcessor.ValidImgPro(img) => img case ImgProcessor.FailedImgPro(msg) => Picture(-1, ByteString.EMPTY
从OOP角度分析这很容易理解,下一段程序需要上一段程序的结果来继续运行。在上面的例子里我们需要先获取count然后把count塞进Document再把Document存入数据库。...decodeRequest { extractDataBytes { bytes => val futBytes = bytes.runFold(ByteString...:Option[String],sort:Option[String],fields:Option[String],top:Option[Int]): DBOResult[Seq[R]] = {...err) } } } } } MongoRoute.scala package com.datatech.rest.mongo import akka.http.scaladsl.server.Directives...._ import akka.http.scaladsl.model._ import akka.http.scaladsl.coding.Gzip import akka.stream.scaladsl
akka提供了一种基于节点运算资源负载的算法,在配置文件中定义: akka.extensions = [ "akka.cluster.metrics.ClusterMetricsExtension" ]...具体情况请参考下面测试程序的输出: package my.akka import akka.actor...."hello after PoisonPill" // [akka://app/user/parent/child-1] Message [java.lang.String] without..." %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-cluster" % akkaVersion...import akka.stream.Materializer import akka.stream.alpakka.mongodb.scaladsl._ import akka.stream.scaladsl
介绍 从Play2.5.x开始,Play使用Akka Streams实现流处理,废弃了之前的Enumerator/Iteratee Api。...根据官方文档描述,迁移至Akka Streams之后,Play2.5.x的整体性能提升了20%,性能提升相当可观。...JSONCollection]("qa") def exportDataStream = Action { implicit request => val source = Source.actorRef[ByteString...ByteString(list.map(qa => qa.question).mkString("\n") + "\n") Cursor.Cont(index + 1)...第10行foldBulks方法负责批量从MongoDB数据库读取查询结果,然后以消息形式将数据发送给sourceActor,最后发送一个Status.Success消息表明数据已经发送完毕。
领取专属 10元无门槛券
手把手带您无忧上云