SparkContext初始化过程

SparkContext在构造的过程中,已经完成了各项服务的启动。因为Scala语法的特点,所有构造函数都会调用默认的构造函数,而默认构造函数的代码直接在类定义中。

除了初始化各类配置、日志之外,最重要的初始化操作之一是启动Task调度器和DAG调度器,相关代码如下:

// 创建并启动Task调度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.send(TaskSchedulerIsSet)
// 创建DAG调度器,并引用之前创建的Task调度器之后,
// 再启动Task调度器
_taskScheduler.start()

DAG调度与Task调度的区别是,DAG是最高层级的调度,为每个Job绘制出一个有向无环图(简称DAG),跟踪各Stage的输出,计算完成Job的最短路径,并将Task提交给Task调度器来执行。而Task调度器只负责接受DAG调度器的请求,负责Task的实际调度执行,所以DAGScheduler的初始化必须在Task调度器之后。

DAG与Task这种分离设计的好处是,Spark可以灵活设计自己的DAG调度,同时还能与其他资源调度系统结合,比如YARN、Mesos。

Task调度器本身的创建在createTaskScheduler函数中进行。根据Spark程序提交时指定的不同模式,可以启动不同类型的调度器。并且出于容错考虑,createTaskScheduler会返回一主一备两个调度器。以YARN cluster模式为例,主、备调度器对应不同类的实例,但是加载了相同的配置。下面摘录了createTaskScheduler函数的相关实现:

private def createTaskScheduler(
    sc: SparkContext,
    master: String): (SchedulerBackend, TaskScheduler) = {
        // 省略部分代码……
        master match {
        // 省略部分case代码……
        case "yarn-standalone" | "yarn-cluster" =>
            if (master == "yarn-standalone") {
                logWarning("\"yarn-standalone\" is deprecated as of Spark 1.0.
                    Use \"yarn-cluster\" instead.")
             }
         // 主调度器
         val scheduler = try {
             val clazz = Class.forName("org.apache.spark.scheduler.
                 cluster.YarnClusterScheduler")
             val cons = clazz.getConstructor(classOf[SparkContext])
             cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
         } catch {
             case e: Exception => {
                throw new SparkException("YARN mode not available ?", e)
             }
         }
         // 备用调度器
         val backend = try {
             val clazz =

                  Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler Backend")
              val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
              cons.newInstance(scheduler, sc).asInstanceOf [CoarseGrainedSchedulerBackend]
          } catch {
              case e: Exception => {
                  throw new SparkException("YARN mode not available ?", e)
              }
          }
          scheduler.initialize(backend)
          (backend, scheduler)
          // 省略部分case代码……
    }
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏个人分享

Hive架构及Hive On Spark

(1)Table:每个表都对应在HDFS中的目录下,数据是经过序列化后存储在该目录中。同时Hive也支持表中的数据存储在其他类型的文件系统中,如NFS或本地文件...

1532
来自专栏斑斓

大数据 | 理解Spark的核心RDD

与许多专有的大数据处理平台不同,Spark建立在统一抽象的RDD之上,使得它可以以基本一致的方式应对不同的大数据处理场景,包括MapReduce,Streami...

3659
来自专栏Spark生态圈

[spark] RDD缓存源码解析

我们可以利用不同的存储级别存储每一个被持久化的RDD。可以存储在内存中,也可以序列化后存储在磁盘上等方式。Spark也会自动持久化一些shuffle操作(如re...

1223
来自专栏大数据-Hadoop、Spark

Spark Streaming + Kafka整合

2185
来自专栏Albert陈凯

3.4 RDD的计算

3.4 RDD的计算 3.4.1 Ta s k简介 原始的RDD经过一系列转换后,会在最后一个RDD上触发一个动作,这个动作会生成一个Job。在Job被划分为...

34410
来自专栏Jed的技术阶梯

Spark性能调优02-代码调优

代码调优,就是要让大家了解以下一些Spark基本开发原则,包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。在开发过程中,时时刻刻都应该注意以...

1122
来自专栏个人分享

Spark作业调度

    Spark在任务提交时,主要存在于Driver和Executor的两个节点.

991
来自专栏祝威廉

Spark Streaming 不同Batch任务可以并行计算么?

其实Stage,Task都是Spark Core里就有的概念,Job 在Streaming和Spark Core里的概念则是不一致的。Batch则是Stream...

823
来自专栏CSDN技术头条

Spark之RDD详解

RDD 概念与特性 RDD是Spark最重要的抽象。spark统一建立在抽象的RDD之上。设计一个通用的编程抽象,使得spark可以应对各种场合的大数据情景。R...

2516
来自专栏Jed的技术阶梯

Spark-RDD持久化

使用不同参数的组合构造的实例被预先定义为一些值,比如MEMORY_ONLY代表着不存入磁盘,存入内存,不使用堆外内存,不进行序列化,副本数为1,使用persis...

933

扫码关注云+社区