首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >如何在spark上使用scalapb启动服务器/客户端grpc?

如何在spark上使用scalapb启动服务器/客户端grpc?
EN

Stack Overflow用户
提问于 2018-07-26 13:04:45
回答 1查看 794关注 0票数 1

我在spark上使用ScalaPB运行服务器/客户端时遇到了一个问题。

当我使用"sbt run“运行我的代码时,它完全可以工作得很好。我想使用spark运行这段代码,因为下一步我会导入spark模型来预测一些标签。但是当我把我的jar提交给spark时,他们给了我这样的错误。

代码语言:javascript
复制
   Exception in thread "main" io.grpc.ManagedChannelProvider$ProviderNotFoundException: 
No functional server found. Try adding a dependency on the grpc-netty artifact

这是我的build.sbt

代码语言:javascript
复制
scalaVersion := "2.11.7"

PB.targets in Compile := Seq(
  scalapb.gen() -> (sourceManaged in Compile).value
)

val scalapbVersion =
    scalapb.compiler.Version.scalapbVersion
val grpcJavaVersion =
    scalapb.compiler.Version.grpcJavaVersion


libraryDependencies ++= Seq(

    // protobuf
    "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",

    //for grpc
    "io.grpc" % "grpc-netty" % grpcJavaVersion ,
    "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion
)

assemblyMergeStrategy in assembly := {
       case PathList("META-INF", xs @ _*) => MergeStrategy.discard
       case x => MergeStrategy.first
   }

使用遮阳板仍然不起作用

代码语言:javascript
复制
assemblyShadeRules in assembly := Seq(ShadeRule.rename("com.google.**" -> "shadegoogle.@1").inAll)

这是我的main

代码语言:javascript
复制
import java.util.logging.Logger
import io.grpc.{Server, ServerBuilder}
import org.apache.spark.ml.tuning.CrossValidatorModel
import org.apache.spark.sql.SparkSession
import testproto.test.{Email, EmailLabel, RouteGuideGrpc}
import scala.concurrent.{ExecutionContext, Future}

object HelloWorldServer {
  private val logger = Logger.getLogger(classOf[HelloWorldServer].getName)

  def main(args: Array[String]): Unit = {
    val server = new HelloWorldServer(ExecutionContext.global)
    server.start()
    server.blockUntilShutdown()
  }
  private val port = 50051
}

class HelloWorldServer(executionContext: ExecutionContext) {
  self =>
  private[this] var server: Server = null

  private def start(): Unit = {
    server = ServerBuilder.forPort(HelloWorldServer.port).addService(RouteGuideGrpc.bindService(new RouteGuideImpl, executionContext)).build.start
    HelloWorldServer.logger.info("Server started, listening on " + HelloWorldServer.port)
    sys.addShutdownHook {
      System.err.println("*** shutting down gRPC server since JVM is shutting down")
      self.stop()
      System.err.println("*** server shut down")
    }
  }

  private def stop(): Unit = {
    if (server != null) {
      server.shutdown()
    }
  }

  private def blockUntilShutdown(): Unit = {
    if (server != null) {
      server.awaitTermination()
    }
  }

  private class RouteGuideImpl extends RouteGuideGrpc.RouteGuide {
    override def getLabel(request: Email): Future[EmailLabel] = {
      val replay = EmailLabel(emailId = request.emailId, label = "aaaaa")
      Future.successful(replay)
    }
  }
}

谢谢

EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51531339

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档