由于jdbc数据库不支持分布式的运算模式,所以从数据交换的角度上它与集群环境是脱离的:jdbc数据不可以从集群中的任何节点获取。所以只有通过基于http的一种服务来向其它节点提供数据。...我首先考虑了akka-http,在准备过程中接触了gRPC,发现gRPC更加适合跨jvm的程序控制,主要因为gRPC支持双向的流控制。...首先示范一个传统的Unary(request/response)模式实现:从客户端向服务端发出一个Query指令、服务端按指令从JDBC数据库中返回DataRows。...下面是.proto文件中对应的IDL定义: message JDBCDataRow { string year = 1; string state = 2; string county = 3;..."scalapb/scalapb.proto"; package grpc.jdbc.services; option (scalapb.options) = { // use a custom
分布式程序运算是一种水平扩展(scale-out)运算模式,其核心思想是能够充分利用服务器集群中每个服务器节点的计算资源,包括:CPU、内存、硬盘、IO总线等。...当然,任务分派是通过算法实现的,包括所有普通router的routing算法如:round-robin, random等等。 ...对正常停止动作,如PoisonPill, context.stop作用:重新构建新的实例并启动。 OnFailure:不响应child-actor正常停止,任其终止。..."scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom Scala package name // package_name...import "google/protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = {
在具体应用中要注意sender()的具体意义:从提供服务的actor方面看,sender()代表ClusterClientReceptionist。...ReceptionistListener],receptionist),"mongo-event-listner") system } } MongoAdder处于同一个集群ClusterSystem中。..."java.lang.String" = java "scalapb.GeneratedMessage" = proto } } remote.netty.tcp.port=..."scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom Scala package name // package_name..."scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom Scala package name // package_name
akka-grpc应用一般从IDL文件里消息类型和服务函数的定义开始,如下面这个.proto文件示范: syntax = "proto3"; import "google/protobuf/wrappers.proto..."; import "google/protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = {...shopId:posId就是代表为某用户构建的entityId,这个是通过用户在Request中提供的MetaData参数中jwt解析得出的。 可以看到,具体服务提供是通过集群的分片实现的。...[ClusterSharding.ShardCommand],mgoHosts: List[String], entityId: String, trace: Boolean, keepAlive: FiniteDuration..." = proto } } } grpc server 基本上是个标准模块,不同的只是service参数: class gRPCServer(host: String, port: Int)
具体使用方式是在集群所有节点部署ClusterSingletonManager,由集群中的leader节点选定其中一个节点并指示上面的ClusterSingletonManager运行一个cluster...} } 例子里的protobuf消息是由scalapb从.proto文件中自动产生的。...下面是这次讨论中的示范源代码: project/scalapb.sbt addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") libraryDependencies.../wrappers.proto"; import "google/protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options..."scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom Scala package name // package_name
然后一如既往,我们使用了streaming编程模式。...值得注意的是这里我们尝试在stream Flow里运算另一个Flow,如:IfExists函数里运算一个Source来确定rowid是否存在。不要在意这个函数的实际应用,它只是一个人为的例子。...下面就是本次讨论涉及的完整源代码: project/scalapb.sbt addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18") resolvers...syntax = "proto3"; import "google/protobuf/wrappers.proto"; import "google/protobuf/any.proto"; import..."scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom Scala package name // package_name
gRPC是google开源提供的一个RPC软件框架,它的特点是极大简化了传统RPC的开发流程和代码量,使用户可以免除许多陷阱并聚焦于实际应用逻辑中。...在一个.proto字符类文件中用IDL来描述用户自定义的数据类型和服务 2、用protoc编译器编译文件并产生自定义数据类型和服务的api源代码 3、在server端实现.proto中定义的服务函数 4...在本篇讨论中我们先示范Unary-service的编程流程,下面是.proto文件内容: syntax = "proto3"; import "google/protobuf/wrappers.proto...通过对.proto文件进行编译后产生文件中包括一个HelloWorldGrpc.scala文件,里面提供了一些重要的api: trait HelloWorld -> 用于实现HelloWorld服务的trait...Future.successful(Greeting(message = s"Hello $greeter, ${request.msg}")) } } 可以看到我们直接使用了IDL描述的自定义数据类型如
前面我们完成了一个CQRS模式的数据采集(录入)平台。可以预见:数据的产生是在线下各式各样的终端系统中,包括web、桌面、移动终端。...下面是这个例子的.proto定义文件: syntax = "proto3"; import "google/protobuf/wrappers.proto"; import "google/protobuf.../any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom Scala package...google/protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = { // use a...但NettyChannelBuilder还具备更多的设置参数,如ssl/tls设置。 3、还有:因为客户端是按照顺序来发送操作指令的,每发一个指令,等待返回结果后才能再发下一个指令。
当然,最基本的是通过对JWT的验证机制可以控制客户端对某些功能的使用权限。...客户端提交身份验证请求返回JWT可以用一个独立的服务函数实现,如下面.proto文件里的GetAuthToken: message PBPOSCredential { string userid.../scalapb.proto or anything from // google/protobuf/*.proto //libraryDependencies += "com.thesamet.scalapb...protobuf/posmessages.proto syntax = "proto3"; import "google/protobuf/wrappers.proto"; import "google.../protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom
与前面我们介绍过的JDBC-streaming和Cassandra-streaming对应操作指令的处理相同,MGO-streaming也是是通过一个Context对象来描述操作方式和内容细节的,MGOContext...这两个函数的实现包含在文章后面提供的源代码中。...MongoDB的.proto文件idl定义如下: syntax = "proto3"; import "google/protobuf/wrappers.proto"; import "google/...protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom..."; import "scalapb/scalapb.proto"; option (scalapb.options) = { // use a custom Scala package name
我们上次提过:由于java-object-serialization会把一个java-object的类型信息、实例值、它所包含的其它类型描述信息等都写入序列化的结果里,所以会占据较大空间,传输数据的效率相对就低了...在akka中使用自定义序列化方法包括下面的这些步骤: 1、在.proto文件中对消息类型进行IDL定义 2、用ScalaPB编译IDL文件并产生scala源代码。...这些源代码中包括了涉及的消息类型及它们的操作方法 3、在akka程序模块中import产生的classes,然后直接调用这些类型和方法 4、按akka要求编写序列化方法 5、在akka的.conf文件里...actor.serializers段落中定义akka的默认serializer 下面的build.sbt文件里描述了程序结构: lazy val commonSettings = Seq( name...注意依赖项中的scalapb.runtime。PB.targets指明了产生源代码的路径。
gRPC Streaming的操作对象由服务端和客户端组成。在一个包含了多个不同服务的集群环境中可能需要从一个服务里调用另一个服务端提供的服务。...如果run这个stream得到的结果应该是一个描述完整移动路径的消息。从请求-服务角度来描述:我们可以把每个节点消息更新处理当作某种完整的数据处理过程。...我们把共用的消息统一放到一个common.proto文件里: syntax = "proto3"; package sdp.grpc.services; message HelloMsg { string..."; import "google/protobuf/any.proto"; import "scalapb/scalapb.proto"; option (scalapb.options) = {..."; import "common.proto"; import "cql/cql.proto"; import "jdbc/jdbc.proto"; import "mgo/mgo.proto"; 下面我们把最核心的服务实现挑出来讲解一下
在一个akka-cluster环境里,从数据调用的角度上,JDBC数据库与集群中其它节点是脱离的。这是因为JDBC数据库不是分布式的,不具备节点位置透明化特性。...这也是所谓的BiDi-Streaming模式,在jdbc.proto的服务描述如下: service JDBCServices { rpc runQuery(JDBCQuery) returns (...具体实现在附件的JDBCEngine.scala中。..."; import "scalapb/scalapb.proto"; package grpc.jdbc.services; option (scalapb.options) = { // use...bytes.toByteArray)) val value = ois.readObject() ois.close() value.asInstanceOf[A] } } 其它部分的源代码和系统设置可以从上次的讨论稿中获取
用户首先在.proto文件中用IDL来定义系统中各种需要进行交换的数据类型。然后用protoc编译器自动产生相关的源代码,里面包括了完整的序列化处理函数。...在一个集成的系统环境内,protobuf数据必须保持与所有系统的松散耦合,不能对这些用户系统有任何依赖。...这样把protobuf数据类型和相关的序列化/反序列化函数打成一个独立的包,由用户系统各自引用就是一种最佳解决方案了。 下面示范产生一个独立的protobuf包。...我们再随便建个.proto文件: syntax = "proto3"; // Brought in from scalapb-runtime import "scalapb/scalapb.proto..."; import "google/protobuf/wrappers.proto"; package proto.microservices; message Added { int32
我们首先在.proto文件里用IDL描述Server-Streaming服务: /* * responding stream of increment results */ service SumOneToMany...我们看看protoc把IDL描述的服务函数变成了什么样的scala函数: def addOneToMany(request: SumRequest, responseObserver: StreamObserver...scalaPB自动产生scala代码中的addManyToOne函数款式如下: def addManyToOne(responseObserver: StreamObserver[SumResponse...注意:虽然在.proto文件中AddManyToOne的返回结果是单个SumResponse,但产生的scala函数则提供了一个StreamObserver[SumResponse]类型,所以需要谨记只能调用一次...) -> (sourceManaged in Compile).value ) src/main/protobuf/sum.proto syntax = "proto3"; package learn.grpc.services
假如我们有个业务系统也是在cassandra上的,那么reader就需要把从日志读出来的事件恢复成cassandra表里的数据行row。...) 这个消息描述了读端需要读取的日志记录范围和persistenceId。..." %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf", "com.thesamet.scalapb..." %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion ) // (optional) If you need scalapb.../scalapb.proto or anything from // google/protobuf/*.proto //libraryDependencies += "com.thesamet.scalapb
那么如果能把gRPC中ListenableFuture和StreamObserver这两种类型转成akka-stream的基本类型应该就能够实现所谓的reactive-gRPC了。...,如: Flow[Request] .throttle(1, 10.millis, 1, ThrottleMode.Shaping) .map(computeResponse) 在客户端我们可以直接经客户端...先从Unary-Call开始:下面是.proto文件的IDL服务描述: syntax = "proto3"; package learn.grpc.akka.stream.services; message...IDL的描述如下: service SumNumbers { rpc SumPair(NumPair) returns (SumResult) {} rpc GenIncsFrom(Num)..."com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion, "io.monix" %% "monix" % "2.3.0",
在http/1应用中对二进制文件的传输交换有诸多限制和不便,特别是效率方面的问题。在protobuf这种序列化模式中对任何类型的数据格式都一视同仁,可以很方便的实现图片等文件的上传下载。...与scalaPB一样,akka-grpc也是通过编译IDL(.proto)文件用相应的插件(plugin)产生相关的scala类和服务函数代码。...实际上akka-grpc产生代码的plugin还是采用scalaPB的插件,这个过程已经在scalaPB系列博客里详细介绍过了。...数据类型和服务函数用IDL定义的.proto文件内容如下: syntax = "proto3"; //#options option java_multiple_files = true; //option...所以,akka-grpc并没有提供对OAuth2规范身份验证的支持。在这个例子里我们就只能进行基本的身份证明(如店号、机器号等),但身份验证过程的安全性就不做任何加密操作了。
使用gRPC作为云平台和移动前端的连接方式,网络安全应该是必须考虑的一个重点。gRPC是支持ssl/tls安全通讯机制的。用了一个周末来研究具体使用方法,实际上是一个周末的挖坑填坑过程。..." %% "compilerplugin" % "0.9.0-M6" 在sbt中执行dependencyTree: ~/scala/intellij/learn-grpc> sbt [info] Loading..." % scalapb.compiler.Version.scalapbVersion ) // (optional) If you need scalapb/scalapb.proto or anything...from // google/protobuf/*.proto //libraryDependencies += "com.thesamet.scalapb" %% "scalapb-runtime"...判断正确,是证书的问题。再研究一下证书是怎么产生的,尝试按文档指引重新产生这些自签证书:可惜的是好像还有些文件是缺失的,如serial。
,而且才刚刚达到枯浅的理解水平,如果在实际应用中能够真正调动自然,则需要添加更多的努力了。 ...在scala编程世界里我们可以用scalaPB来实现对gRPC和protobuf的使用。...google gRPC的使用流程如下: 1、创建一个.proto文件,用IDL语言(Interface Definition Language)定义数据类型和服务 2、对.proto文件进行编译后产生相关的...java数据类型和抽象服务框架 3、在java编程中可以直接调用编译产生的数据类型及对数据进行操作 4、继承并实现产生的服务类 scalaPB是一个scala版的protobuf编译器。...编译.proto文件后产生scala语言的数据类型和抽象服务类,这样我们就可以在scala环境里使用protobuf和gRPC实现微服务的集成编程了。
领取专属 10元无门槛券
手把手带您无忧上云