SparkContext
在构造的过程中,已经完成了各项服务的启动。因为Scala语法的特点,所有构造函数都会调用默认的构造函数,而默认构造函数的代码直接在类定义中。
除了初始化各类配置、日志之外,最重要的初始化操作之一是启动Task调度器和DAG调度器,相关代码如下:
// 创建并启动Task调度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.send(TaskSchedulerIsSet)
// 创建DAG调度器,并引用之前创建的Task调度器之后,
// 再启动Task调度器
_taskScheduler.start()
DAG调度与Task调度的区别是,DAG是最高层级的调度,为每个Job绘制出一个有向无环图(简称DAG),跟踪各Stage的输出,计算完成Job的最短路径,并将Task提交给Task调度器来执行。而Task调度器只负责接受DAG调度器的请求,负责Task的实际调度执行,所以DAGScheduler
的初始化必须在Task调度器之后。
DAG与Task这种分离设计的好处是,Spark可以灵活设计自己的DAG调度,同时还能与其他资源调度系统结合,比如YARN、Mesos。
Task调度器本身的创建在createTaskScheduler
函数中进行。根据Spark程序提交时指定的不同模式,可以启动不同类型的调度器。并且出于容错考虑,createTaskScheduler
会返回一主一备两个调度器。以YARN cluster模式为例,主、备调度器对应不同类的实例,但是加载了相同的配置。下面摘录了createTaskScheduler
函数的相关实现:
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
// 省略部分代码……
master match {
// 省略部分case代码……
case "yarn-standalone" | "yarn-cluster" =>
if (master == "yarn-standalone") {
logWarning("\"yarn-standalone\" is deprecated as of Spark 1.0.
Use \"yarn-cluster\" instead.")
}
// 主调度器
val scheduler = try {
val clazz = Class.forName("org.apache.spark.scheduler.
cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
// 备用调度器
val backend = try {
val clazz =
Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler Backend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf [CoarseGrainedSchedulerBackend]
} catch {
case e: Exception => {
throw new SparkException("YARN mode not available ?", e)
}
}
scheduler.initialize(backend)
(backend, scheduler)
// 省略部分case代码……
}
}