专栏首页大数据-数据人生SparkContext源码解读
原创

SparkContext源码解读

在SparkContext中主要做几件事情,如下:

1 创建TaskSceduler

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

TaskSceduler在具体创建的时候,是由不同发布模式比如standalone、yarn、mesos决定的,返回一个SchedulerBackend.

private def createTaskScheduler(
    sc: SparkContext,
    master: String,
    deployMode: String): (SchedulerBackend, TaskScheduler) = {
  import SparkMasterRegex._

  // When running locally, don't try to re-execute tasks on failure.
  val MAX_LOCAL_TASK_FAILURES = 1

  master match {
    case "local" =>
      val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
      val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
      scheduler.initialize(backend)
      (backend, scheduler)

    case LOCAL_N_REGEX(threads) =>
      def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
      // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
      val threadCount = if (threads == "*") localCpuCount else threads.toInt
      if (threadCount <= 0) {
        throw new SparkException(s"Asked to run locally with $threadCount threads")
      }
      val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
      val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
      scheduler.initialize(backend)
      (backend, scheduler)

    case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
      def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
      // local[*, M] means the number of cores on the computer with M failures
      // local[N, M] means exactly N threads with M failures
      val threadCount = if (threads == "*") localCpuCount else threads.toInt
      val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
      val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
      scheduler.initialize(backend)
      (backend, scheduler)

    //Standalone模式
    case SPARK_REGEX(sparkUrl) =>
      val scheduler = new TaskSchedulerImpl(sc)
      val masterUrls = sparkUrl.split(",").map("spark://" + _)
      val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
      scheduler.initialize(backend)
      (backend, scheduler)

    case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
      // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
      val memoryPerSlaveInt = memoryPerSlave.toInt
      if (sc.executorMemory > memoryPerSlaveInt) {
        throw new SparkException(
          "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
            memoryPerSlaveInt, sc.executorMemory))
      }

      val scheduler = new TaskSchedulerImpl(sc)
      val localCluster = new LocalSparkCluster(
        numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
      val masterUrls = localCluster.start()
      val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
      scheduler.initialize(backend)
      backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
        localCluster.stop()
      }
      (backend, scheduler)

    case MESOS_REGEX(mesosUrl) =>
      MesosNativeLibrary.load()
      val scheduler = new TaskSchedulerImpl(sc)
      val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true)
      val backend = if (coarseGrained) {
        new MesosCoarseGrainedSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager)
      } else {
        new MesosFineGrainedSchedulerBackend(scheduler, sc, mesosUrl)
      }
      scheduler.initialize(backend)
      (backend, scheduler)

    //对其他资源管理方式(除了local和standelone{spark://}外的mesos,yarn,kubernetes【外部资源管理器】的资源管理方式)的处理。
    case masterUrl =>
      //这句很重要,实现了外部资源管理器,具体实现访问为Yarn的YarnClusterManager
      val cm = getClusterManager(masterUrl) match {
        case Some(clusterMgr) => clusterMgr
        case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
      }
      try {
        val scheduler = cm.createTaskScheduler(sc, masterUrl)
        val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
        cm.initialize(scheduler, backend)
        (backend, scheduler)
      } catch {
        case se: SparkException => throw se
        case NonFatal(e) =>
          throw new SparkException("External scheduler cannot be instantiated", e)
      }
  }

SparkContext这里启动TaskSchelerImpl之后,具体进行初始化任务的时候是通过以下代码调用的:

scheduler.initialize(backend)

TaskScheduler的Initalize方法:

def initialize(backend: SchedulerBackend) {
  this.backend = backend
  // temporarily set rootPool name to empty
  rootPool = new Pool("", schedulingMode, 0, 0)
  schedulableBuilder = {
    schedulingMode match {
      case SchedulingMode.FIFO =>
        new FIFOSchedulableBuilder(rootPool)
      case SchedulingMode.FAIR =>
        new FairSchedulableBuilder(rootPool, conf)
      case _ =>
        throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
    }
  }
  schedulableBuilder.buildPools()
}

TaskScheluerImpl的一个重要功能就是决定job的调度顺序,启动任务推测执行机制。

另外在SparkContext中还有个地方调用TaskScheluerImpl方法:

_taskScheduler.start()

调用的方法如下:

override def start() {
  //调用SchedulerBackend.start()方法
  backend.start()
  //在这里会判断spark的推测执行机制
  if (!isLocal && conf.getBoolean("spark.speculation", false)) {
    logInfo("Starting speculative execution thread")
    speculationScheduler.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
        checkSpeculatableTasks()
      }
    }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
  }
}

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • JAVA中的加密算法之双向加密(一)

           加密,是以某种特殊的算法改变原有的信息数据,使得未授权的用户即使获得了已加密的信息,但因不知解密的方法,仍然无法了解信息的内容。大体上分为双向加密...

    幽鸿
  • 管理SQL Server 2008 数据库角色

    角色是SQL Server 2008用来集中管理数据库或者服务器的权限。数据库管理员将操作数据库的权限赋予角色。然后,数据库管理员再将角色赋给数据库用户或者登录...

    幽鸿
  • Hive decimal类型

    Hive也有decimal类型,并且可以指定长度,最好指定长度吧。刚开始以为Hive的decimal类型和MySql一致。后来发现想错了,还是个大坑!

    幽鸿
  • 手机号验证码登录性能测试

    这两天遭遇了手机号登录相关的压测需求,算是比较棘手的。主要原因有两个,第一:之前从来没有接手过这个项目,不熟悉各种规则;第二:数据量偏大,需要开发配合协调校验规...

    八音弦
  • 手机号验证码登录性能测试

    这两天遭遇了手机号登录相关的压测需求,算是比较棘手的。主要原因有两个,第一:之前从来没有接手过这个项目,不熟悉各种规则;第二:数据量偏大,需要开发配合协调校验规...

    八音弦
  • 【原生态】Http请求数据 与 发送数据

    Http请求都是通过输入输出流来进行操作的,首先要制定GET或者POST,默认是GET,在安全和数据量较大情况下请使用post

    肖哥哥
  • PyTorch更新了:支持Windows,新增零维张量

    PyTorch今天发布了v0.4.0版本,网友们反响非常热烈,甚至有人说:感觉就像提前过圣诞~

    量子位
  • (开源)手机app控制c51单片机,附微信小程序控制

    原理简述:利用发布订阅模式。第一步,新建主题,第二步,stc89c51通过esp8266订阅这个主题。第三步、通过app往这个主题发消息。由于单片机订阅了这个主...

    巴法
  • 计算机视觉中的细节问题(九)

    对于Bounding Box的列表B及其对应的置信度S,采用下面的计算方式。选择具有最大score的检测框M,将其从B集合中移除并加入到最终的检测结果D中。通常...

    于小勇
  • 用winsw让任何Windows程序都能运行为服务

    winsw介绍 有时候我们需要在Windows下开机运行某些程序,这对于有图形界面的程序来说一般不是什么事,在选项中选中开机启动,然后它们就可以自动运行了。但是...

    乐百川

扫码关注云+社区

领取腾讯云代金券