Spark DAG调度

SparkContext在初始化时,创建了DAG调度与Task调度来负责RDD Action操作的调度执行。

DAGScheduler

DAGScheduler负责Spark的最高级别的任务调度,调度的粒度是Stage,它为每个Job的所有Stage计算一个有向无环图,控制它们的并发,并找到一个最佳路径来执行它们。具体的执行过程是将Stage下的Task集提交给TaskScheduler对象,由它来提交到集群上去申请资源并最终完成执行。

DAGScheduler的定义位于scheduler/DAGScheduler.scala中。下面是它的类声明,初始化时除了需要一个SparkContext对象外,最重要的是需要输入一个TaskScheduler对象来负责Task的执行:

private[spark]
class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
extends Logging {
    ...
}
1.runJob过程

所有需要执行的RDD Action,都会调用SparkContext.runJob来提交任务,而SparkContext.runJob调用的是DAGScheduler.runJob。下面是runJob的定义:

def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    allowLocal: Boolean,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, allowLocal,
        resultHandler, properties)
    waiter.awaitResult() match {
        case JobSucceeded =>
            logInfo("Job %d finished: %s, took %f s".format
                (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        case JobFailed(exception: Exception) =>
            logInfo("Job %d failed: %s, took %f s".format
                (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        throw exception
    }
}

runJob调用submitJob提交任务,并等待任务结束。

任务提交后的处理过程大概如下。

(1) submitJob生成新的Job ID,发送消息JobSubmitted

(2) DAG收到JobSubmitted消息,调用handleJobSubmitted来处理。

(3) handleJobSubmitted创建一个ResultStage,并使用submitStage来提交这个ResultStage

上面的过程看起来没完,实际上大的过程已经结束了,猫腻在submitStage中。Spark的执行过程是“懒惰”(lazy)的,这在这里得到了完整的体现。任务提交时,不是按Job的先后顺序提交的,而是倒序的。每个Job的最后一个操作是Action操作,DAG把这最后的Action操作当作一个Stage,首先提交,然后逆向逐级递归填补缺少的上级Stage,从而生成一棵实现最后Action操作的最短的(因为都是必须的)有向无环图,然后再从头开始计算。

submitStage方法的实现代码如下所示:

// 提交Stage之前,先递归提交所缺失的父Stage
private def submitStage(stage: Stage) {
    val jobId = activeJobForStage(stage)
    if (jobId.isDefined) {
        logDebug("submitStage(" + stage + ")")
        if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage))
        {
            val missing = getMissingParentStages(stage).sortBy(_.id)
            logDebug("missing: " + missing)
            if (missing.isEmpty) {
                // 仅在所有缺失的父Stage都提交执行了,才开始提交自己
                logInfo("Submitting " + stage + " (" + stage.rdd + "),
                    which has no missing parents")
                submitMissingTasks(stage, jobId.get)
            } else {
                for (parent <- missing) {
                    submitStage(parent)
                }
                waitingStages += stage
            }
        }
    } else {
        abortStage(stage, "No active job for stage " + stage.id)
    }
}

可以看到,这是一个逆向递归的过程,先查找所有缺失的上级Stage并提交,待所有上级Stage都提交执行了,才轮到执行当前Stage对应的Task。

查找上级Stage的过程,其实就是递归向上遍历所有RDD依赖列表并生成Stage的过程,代码如下所示:

private def getMissingParentStages(stage: Stage): List[Stage] = {
    val missing = new HashSet[Stage]
    val visited = new HashSet[RDD[_]]
    // 这里手工维护一个堆栈,避免递归访问过程中的栈溢出错误
    val waitingForVisit = new Stack[RDD[_]]
    def visit(rdd: RDD[_]) {
        if (!visited(rdd)) {
            visited += rdd
            if (getCacheLocs(rdd).contains(Nil)) {
                for (dep <- rdd.dependencies) {
                    dep match {
                        case shufDep: ShuffleDependency[_, _, _] =>
                            val mapStage = getShuffleMapStage(shufDep, stage.jobId)
                            if (!mapStage.isAvailable) {
                                missing += mapStage
                            }
                        case narrowDep: NarrowDependency[_] =>
                            waitingForVisit.push(narrowDep.rdd)
                    }
                }
            }
        }
    }
    waitingForVisit.push(stage.rdd)
    while (waitingForVisit.nonEmpty) {
        visit(waitingForVisit.pop())
    }
    missing.toList
}

熟悉数据结构的读者不难发现,遍历的过程是非递归的层序遍历(不是前序、中序或后序),使用了一个堆栈来协助遍历,而且保证了层序的顺序与DAG中的依赖顺序一致。

2.Stage

值得注意的是,仅对依赖类型是ShuffleDependency的RDD操作创建Stage,其他的RDD操作并没有创建Stage。这里我们补一下Stage的概念。前面讲RDD时提到过,RDD操作有两类依赖:一类是窄依赖,一个RDD分区只依赖上一个RDD的部分分区,而且这些分区都在相同的节点上;另外一类依赖是Shuffle依赖,一个RDD分区可能会依赖上一级RDD的全部分区,一个典型的例子是groupBy聚合操作。这两类操作在计算上有明显的区别,窄依赖都在同一个节点上进行计算,而Shuffle依赖跨越多个节点,甚至所有涉及的计算节点。因此,DAG在调度时,对于在相同节点上进行的Task计算,会合并为一个Stage。

所以,只有两种情况下会生成新的Stage:一类是依赖类型是Shuffle的Transformation操作会触发生成新的Stage,几乎所有的ByKey操作都是,比如reduceByKeygroupByKey;另外一类是Action操作,是为了生成默认的Stage,这样即便没有Shuffle类操作,保证至少有一个Stage。

总结一下,各Stage之间以Shuffle为分界线。

TaskScheduler

相对DAGScheduler而言,TaskScheduler是低级别的调度接口,允许实现不同的Task调度器。目前,已经实现的Task调度器除了自带的以外,还有YARN和Mesos调度器。每个TaskScheduler对象只服务于一个SparkContext的Task调度。TaskSchedulerDAGScheduler的每个Stage接收一组Task,并负责将它们发送到集群上,运行它们,如果出错还会重试,最后返回消息给DAGScheduler

TaskScheduler的主要接口包括一个钩子接口(也称hook,表示定义好之后,不是用户主动调用的),被调用的时机是在初始化完成之后和调度启动之前:

def postStartHook() { }

还有启动和停止调度的命令:

def start(): Unit
def stop(): Unit

此外,还有提交和撤销Task集的命令:

def submitTasks(taskSet: TaskSet): Unit
def cancelTasks(stageId: Int, interruptThread: Boolean)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java成长之路

三、JVM之对象的创建

上篇博文中已经介绍过了jvm内存的概况,接下来我们从jvm的角度来重新来认识一下Java对象是如何创建。 Java是一门面向对象的语言,在Java程序运行的...

652
来自专栏区块链入门

【易错概念】以太坊存储类型(memory,storage)及变量存储详解

在 Solidity 中,有两个地方可以存储变量 :存储(storage)以及内存(memory)。Storage变量是指永久存储在区块链中的变量。Memory...

462
来自专栏架构说

缓存策略之LRU实现及分析

LRU定义 Cache的容量有限,因此当Cache的容量用完后,而又有新的内容需要添加进来时, 就需要挑选并舍弃原有的部分内容,从而腾出空间...

30210
来自专栏腾讯云数据库(TencentDB)

【腾讯云CDB】源码分析 · MySQL binlog组提交和Multi-Threaded-Slave

MySQL 5.6引入了基于schema的并行复制,即如果binlog events操作的是不同schema的对象,不是DDL,且操作的对象没有对其他schem...

8371
来自专栏Java编程技术

JDK8中新增原子性操作类LongAccumulator

LongAdder类是LongAccumulator的一个特例,LongAccumulator提供了比LongAdder更强大的功能,如下构造函数其中accum...

822
来自专栏Android相关

处理器结构--ReorderBuffer

Reorder Buffer用来保存在乱序执行之前的(OOOE)指令执行顺序,当指令集合在乱序执行后按照原有指令顺序将结果提交。

1014
来自专栏腾讯技术工程官方号的专栏

鹅厂 TDSQL XA 事务隔离级别的奥秘

TDSQL XA 全局事务(global transaction)是指用户客户端连接到 TDSQL XA 分布式数据库系统后发起和执行的事务,也就是 TDSQL...

2882
来自专栏Albert陈凯

Spark详解06容错机制Cache 和 Checkpoint Cache 和 Checkpoint

Cache 和 Checkpoint 作为区别于 Hadoop 的一个重要 feature,cache 机制保证了需要访问重复数据的应用(如迭代型算法和交互式应...

41612
来自专栏腾讯数据库技术

减少MySQL主从延迟的神器--并行复制大揭密

2303
来自专栏机器学习从入门到成神

海量数据处理之Bti-map详解

    所谓的Bit-map就是用一个bit位来标记某个元素对应的Value, 而Key即是该元素。由于采用了Bit为单位来存储数据,因此在存储空间方面,可以...

751

扫码关注云+社区