SparkContext初始化过程

SparkContext在构造的过程中,已经完成了各项服务的启动。因为Scala语法的特点,所有构造函数都会调用默认的构造函数,而默认构造函数的代码直接在类定义中。

除了初始化各类配置、日志之外,最重要的初始化操作之一是启动Task调度器和DAG调度器,相关代码如下:

// 创建并启动Task调度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.send(TaskSchedulerIsSet)
// 创建DAG调度器,并引用之前创建的Task调度器之后,
// 再启动Task调度器
_taskScheduler.start()

DAG调度与Task调度的区别是,DAG是最高层级的调度,为每个Job绘制出一个有向无环图(简称DAG),跟踪各Stage的输出,计算完成Job的最短路径,并将Task提交给Task调度器来执行。而Task调度器只负责接受DAG调度器的请求,负责Task的实际调度执行,所以DAGScheduler的初始化必须在Task调度器之后。

DAG与Task这种分离设计的好处是,Spark可以灵活设计自己的DAG调度,同时还能与其他资源调度系统结合,比如YARN、Mesos。

Task调度器本身的创建在createTaskScheduler函数中进行。根据Spark程序提交时指定的不同模式,可以启动不同类型的调度器。并且出于容错考虑,createTaskScheduler会返回一主一备两个调度器。以YARN cluster模式为例,主、备调度器对应不同类的实例,但是加载了相同的配置。下面摘录了createTaskScheduler函数的相关实现:

private def createTaskScheduler(
    sc: SparkContext,
    master: String): (SchedulerBackend, TaskScheduler) = {
        // 省略部分代码……
        master match {
        // 省略部分case代码……
        case "yarn-standalone" | "yarn-cluster" =>
            if (master == "yarn-standalone") {
                logWarning("\"yarn-standalone\" is deprecated as of Spark 1.0.
                    Use \"yarn-cluster\" instead.")
             }
         // 主调度器
         val scheduler = try {
             val clazz = Class.forName("org.apache.spark.scheduler.
                 cluster.YarnClusterScheduler")
             val cons = clazz.getConstructor(classOf[SparkContext])
             cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
         } catch {
             case e: Exception => {
                throw new SparkException("YARN mode not available ?", e)
             }
         }
         // 备用调度器
         val backend = try {
             val clazz =

                  Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler Backend")
              val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
              cons.newInstance(scheduler, sc).asInstanceOf [CoarseGrainedSchedulerBackend]
          } catch {
              case e: Exception => {
                  throw new SparkException("YARN mode not available ?", e)
              }
          }
          scheduler.initialize(backend)
          (backend, scheduler)
          // 省略部分case代码……
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Albert陈凯

Spark详解06容错机制Cache 和 Checkpoint Cache 和 Checkpoint

Cache 和 Checkpoint 作为区别于 Hadoop 的一个重要 feature,cache 机制保证了需要访问重复数据的应用(如迭代型算法和交互式应...

41312
来自专栏简单聊聊Spark

Spark内核分析之Shuffle操作流程(非常重要)

        如题,我们来分析一下spark的shuffle操作原理;为什么说其非常重要,是因为shuffle操作是我们在Spark调优中非常重要的一环,对s...

953
来自专栏Jed的技术阶梯

zookeeper编程02-服务器上下线动态感知

NameNode判断DataNode是否下线的时间太长了,利用zookeeper实现服务器上下线动态感知

962
来自专栏Spark学习技巧

Flink与Spark Streaming在与kafka结合的区别!

本文主要是想聊聊flink与kafka结合。当然,单纯的介绍flink与kafka的结合呢,比较单调,也没有可对比性,所以的准备顺便帮大家简单回顾一下Spark...

812
来自专栏Albert陈凯

Spark详解01概览|Spark部署|执行原理概览Job 例子

概览 拿到系统后,部署系统是第一件事,那么系统部署成功以后,各个节点都启动了哪些服务? 部署图 ? Spark部署图 从部署图中可以看到 整个集群分为 Mast...

3395
来自专栏Spark学习技巧

必读:Spark与kafka010整合

SparkStreaming与kafka010整合 读本文之前,请先阅读之前文章: 必读:再讲Spark与kafka 0.8.2.1+整合 Spark Str...

5017
来自专栏牛肉圆粉不加葱

Spark executor模块① - 主要类以及创建 AppClient

SchedulerBackend 在 Standalone 模式下的 SchedulerBackend 的实现是 StandaloneSchedulerBack...

421
来自专栏Albert陈凯

Spark概要掌握情况自我核查

1、Spark目前只持哪哪种语言的API? Java, Scala, Python, R. Ref: http://spark.apache.org/ 2、R...

2373
来自专栏用户画像

3.1.5 内存管理

在单道批处理系统阶段,一个系统一个时间段内只执行一个程序,内存的分配及其简单,仅分配给当前运行进程即可。而引入了多道程序并发执行之后,进程之间共享的不仅仅是处理...

663
来自专栏个人分享

Spark作业调度

    Spark在任务提交时,主要存在于Driver和Executor的两个节点.

821

扫码关注云+社区