前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用

Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用

作者头像
绿巨人
发布2018-05-18 11:08:01
1.1K0
发布2018-05-18 11:08:01
举报
文章被收录于专栏:绿巨人专栏绿巨人专栏

前言

Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。 在Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。 本文的目标是写一个基于akka的scala工程,在一个spark standalone的集群环境中运行。

akka是什么?

akka的作用

akka的名字是action kernel的回文。根据官方定义:akka用于resilient elastic distributed real-time transaction processing。 个人理解是: resilient:是指对需求和安全性等方面(来自于外部的)的一种适应力(弹性)。 elastic:是指对资源利用方面的弹性。 因此,akka是一个满足需求弹性、资源分配弹性的分布式实时事务处理系统。 akka只是一个类库,一个工具,并没有提供一个平台。

akka的运行模式和用例

  • akka有两种运行模式:
  • As a library: 一个使用于web应用,把akka作为一个普通的jar包放到classpath或者WEB-INF/lib
  • As an application: 也称为micro system。
  • akka的用例 akka的用例很多,可以参照Examples of use-cases for Akka.

本文中的用例

在本文中,一个Spark + akka的环境里,akka被用于as an application模式下。 我们会创建一个akka工程,含有两个应用:

  • akka host application 建立一个actor system, 定义了所有的任务(actors)。等待客户端的请求。 部分actor使用了spark的云计算功能。 这是一个spark的应用。
  • akka client application 调用host application上特定的actor。

我们看出,这里我们把akka作为一个任务处理器,并通过spark来完成任务。

项目结构和文件说明

说明

这个工程包含了两个应用。 一个Consumer应用:CusomerApp:实现了通过Spark的Stream+Kafka的技术来实现处理消息的功能。 一个Producer应用:ProducerApp:实现了向Kafka集群发消息的功能。

文件结构

AkkaSampleApp    # 项目目录
|-- build.bat    # build文件    
|-- src
    |-- main
        |-- resources
            |-- application.conf   # Akka Server应用的配置文件
            |-- client.conf        # Akka Client应用的配置文件
        |-- scala
            |-- ClientActor.scala       # Akka Client的Actor:提供了一种调用Server Actor的方式。
            |-- ClientApp.scala         # Akka Client应用
            |-- ProductionReaper.scala  # Akka Shutdown pattern的实现者
            |-- Reaper.scala            # Akka Shutdown pattern的Reaper抽象类
            |-- ServerActor.scala       # Akka Server的Actor,提供一个求1到n的MapReduce计算。使用了Spark。
            |-- ServerApp.scala         # Akka Server应用

构建工程目录

可以运行:

mkdir AkkaSampleApp
mkdir -p /AkkaSampleApp/src/main/resources
mkdir -p /AkkaSampleApp/src/main/scala

代码

build.sbt

name := "akka-sample-app"
 
version := "1.0"
 
scalaVersion := "2.11.8"

scalacOptions += "-feature"
scalacOptions += "-deprecation"
scalacOptions += "-language:postfixOps"
 
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.4.10",
  "com.typesafe.akka" %% "akka-remote" % "2.4.10",
  "org.apache.spark" %% "spark-core" % "2.0.0"
)

resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"

application.conf

akka {
  #loglevel = "DEBUG"
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
    #log-sent-messages = on
    #log-received-messages = on
  }
}

cient.conf

akka {
  #loglevel = "DEBUG"
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
    #log-sent-messages = on
    #log-received-messages = on
  }
}

注:port = 0表示这个端口号会自动生成一个。

ClientActor.scala

import akka.actor._
import akka.event.Logging

class ClientActor(serverPath: String) extends Actor {
  val log = Logging(context.system, this)
  val serverActor = context.actorSelection(serverPath)

  def receive = {
    case msg: String =>
        log.info(s"ClientActor received message '$msg'")
        serverActor ! 10000L
  }
}

ClientApp.scala

import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.remote.RemoteScope
import akka.util._

import java.util.concurrent.TimeUnit

import scala.concurrent._
import scala.concurrent.duration._

object ClientApp {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("LocalSystem", ConfigFactory.load("client"))
    
    // get the remote actor via the server actor system's address
    val serverAddress = AddressFromURIString("akka.tcp://ServerActorSystem@127.0.0.1:2552")
    val actor = system.actorOf(Props[ServerActor].withDeploy(Deploy(scope = RemoteScope(serverAddress))))

    // invoke the remote actor via a client actor.
    // val remotePath = "akka.tcp://ServerActorSystem@127.0.0.1:2552/user/serverActor"
    // val actor = system.actorOf(Props(classOf[ClientActor], remotePath), "clientActor")

    buildReaper(system, actor)

    // tell
    actor ! 10000L
    
    waitShutdown(system, actor)
  }

  private def buildReaper(system: ActorSystem, actor: ActorRef): Unit = {
    import Reaper._
    val reaper = system.actorOf(Props(classOf[ProductionReaper]))
    
    // Watch the action
    reaper ! WatchMe(actor)
  }

  private def waitShutdown(system: ActorSystem, actor: ActorRef): Unit = {
    // trigger the shutdown operation in ProductionReaper
    system.stop(actor)
    
    // wait to shutdown
    Await.result(system.whenTerminated, 60.seconds)
  }
}

ProductionReaper.scala

当所有的Actor停止后,终止Actor System。

class ProductionReaper extends Reaper {
  // Shutdown
  def allSoulsReaped(): Unit = {
    context.system.terminate()
  }
}

Reaper.scala

import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer

object Reaper {
  // Used by others to register an Actor for watching
  case class WatchMe(ref: ActorRef)
}

abstract class Reaper extends Actor {
  import Reaper._

  // Keep track of what we're watching
  val watched = ArrayBuffer.empty[ActorRef]

  // Derivations need to implement this method.  It's the
  // hook that's called when everything's dead
  def allSoulsReaped(): Unit

  // Watch and check for termination
  final def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched += ref
    case Terminated(ref) =>
      watched -= ref
      if (watched.isEmpty) allSoulsReaped()
  }
}

ServerActor.scala

提供一个求1到n平方和的MapReduce计算。

import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

class ServerActor extends Actor {
  val log = Logging(context.system, this)

  def receive = {
    case n: Long =>
        squareSum(n)
  }

  private def squareSum(n: Long): Long = {
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)

    val squareSum = sc.parallelize(1L until n).map { i => 
      i * i
    }.reduce(_ + _)

    log.info(s"============== The square sum of $n is $squareSum. ==============")

    squareSum
  }
}

ServerApp.scala

import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props

object ServerApp {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("ServerActorSystem")
    val actor = system.actorOf(Props[ServerActor], name = "serverActor")
  }
}

构建工程

进入目录AkkaSampleApp。运行:

sbt package

第一次运行时间会比较长。

测试应用

启动Spark服务

  • 启动spark集群master server
$SPARK_HOME/sbin/start-master.sh

master服务,默认会使用7077这个端口。可以通过其日志文件查看实际的端口号。

  • 启动spark集群slave server
$SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077

启动Akka Server应用

运行:

$SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ServerApp target/scala-2.11/akka-sample-app_2.11-1.0.jar

如果出现java.lang.NoClassDefFoundError错误, 请参照Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境, 确保akka的包在Spark中设置好了。 注:可以使用Ctrl+C来中断这个Server应用。

启动Akka Client应用

新启动一个终端,运行:

java -classpath ./target/scala-2.11/akka-sample-app_2.11-1.0.jar:$AKKA_HOME/lib/akka/*:$SCALA_HOME/lib/* ClientApp
# or
# $SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ClientApp target/scala-2.11/akka-sample-app_2.11-1.0.jar

然后:看看Server应用是否开始处理了。

总结

Server应用需要Spark的技术,因此,是在Spark环境中运行。 Clinet应用,可以是一个普通的Java应用。

下面请看

至此,我们已经写好了一个spark集群+akka+scala的应用。下一步请看: Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用

参照

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-10-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • akka是什么?
    • akka的作用
      • akka的运行模式和用例
      • 本文中的用例
      • 项目结构和文件说明
        • 说明
          • 文件结构
          • 构建工程目录
          • 代码
            • build.sbt
              • application.conf
                • cient.conf
                  • ClientActor.scala
                    • ClientApp.scala
                      • ProductionReaper.scala
                        • Reaper.scala
                          • ServerActor.scala
                            • ServerApp.scala
                            • 构建工程
                            • 测试应用
                              • 启动Spark服务
                                • 启动Akka Server应用
                                  • 启动Akka Client应用
                                  • 总结
                                  • 下面请看
                                  • 参照
                                  领券
                                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档