前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SparkContext初始化过程

SparkContext初始化过程

作者头像
天策
发布2018-06-22 13:56:47
6530
发布2018-06-22 13:56:47
举报
文章被收录于专栏:行者悟空行者悟空

SparkContext在构造的过程中,已经完成了各项服务的启动。因为Scala语法的特点,所有构造函数都会调用默认的构造函数,而默认构造函数的代码直接在类定义中。

除了初始化各类配置、日志之外,最重要的初始化操作之一是启动Task调度器和DAG调度器,相关代码如下:

代码语言:javascript
复制
// 创建并启动Task调度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.send(TaskSchedulerIsSet)
// 创建DAG调度器,并引用之前创建的Task调度器之后,
// 再启动Task调度器
_taskScheduler.start()

DAG调度与Task调度的区别是,DAG是最高层级的调度,为每个Job绘制出一个有向无环图(简称DAG),跟踪各Stage的输出,计算完成Job的最短路径,并将Task提交给Task调度器来执行。而Task调度器只负责接受DAG调度器的请求,负责Task的实际调度执行,所以DAGScheduler的初始化必须在Task调度器之后。

DAG与Task这种分离设计的好处是,Spark可以灵活设计自己的DAG调度,同时还能与其他资源调度系统结合,比如YARN、Mesos。

Task调度器本身的创建在createTaskScheduler函数中进行。根据Spark程序提交时指定的不同模式,可以启动不同类型的调度器。并且出于容错考虑,createTaskScheduler会返回一主一备两个调度器。以YARN cluster模式为例,主、备调度器对应不同类的实例,但是加载了相同的配置。下面摘录了createTaskScheduler函数的相关实现:

代码语言:javascript
复制
private def createTaskScheduler(
    sc: SparkContext,
    master: String): (SchedulerBackend, TaskScheduler) = {
        // 省略部分代码……
        master match {
        // 省略部分case代码……
        case "yarn-standalone" | "yarn-cluster" =>
            if (master == "yarn-standalone") {
                logWarning("\"yarn-standalone\" is deprecated as of Spark 1.0.
                    Use \"yarn-cluster\" instead.")
             }
         // 主调度器
         val scheduler = try {
             val clazz = Class.forName("org.apache.spark.scheduler.
                 cluster.YarnClusterScheduler")
             val cons = clazz.getConstructor(classOf[SparkContext])
             cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
         } catch {
             case e: Exception => {
                throw new SparkException("YARN mode not available ?", e)
             }
         }
         // 备用调度器
         val backend = try {
             val clazz =

                  Class.forName("org.apache.spark.scheduler.cluster.YarnClusterScheduler Backend")
              val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
              cons.newInstance(scheduler, sc).asInstanceOf [CoarseGrainedSchedulerBackend]
          } catch {
              case e: Exception => {
                  throw new SparkException("YARN mode not available ?", e)
              }
          }
          scheduler.initialize(backend)
          (backend, scheduler)
          // 省略部分case代码……
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017年03月16日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档