Spark作业调度

    Spark在任务提交时,主要存在于Driver和Executor的两个节点.

(1)Driver的作用: 用于将所有要处理的RDD的操作转化为DAG,并且根据RDD DAG将JBO分割为多个Stage,最后生成相应的task,分发到各个Executor执行.

流程:sc.runJob -> DAGScheduler.runJob ->submitJob ->DAGEventProcessActor ->dagScheduler.handleJobSubmitted ->submitStage ->submitMissingTasks ->taskScheduler.submitTasks -> schedulerBackend.reviveOffers ->ReviveOffers ->DriverActor ->makeOffers -> resourceOffers ->launchTasks ->CoarseGrainedExecutorBackend(Executor)

其中handleJobSubmitted和submitStage主要负责依赖性分析,生成finalStage,根据finalStage来生成job.

源码newStage用来创建一个新的Stage

private def newStage(
        rdd:RDD[],
        numTasks: Int,
        shuffleDep: Option[ShuffleDependency[_,_,_]],
        jobId:Int,
        callSite:CallSite)
    :stage =
    {
        val id = nextStageId.getAndIncrement()
        val stage = new Stage(id,rdd,numTasks,shuffleDep,getParentStages(rdd,jobId),jobId,callSite)
        stageIdToStage(id) = stage 
        updateJobIdStageIdMaps(jobId,stage)
        stageToInfos(stage) = StageInfo.fromStage(stage)
        stage
}

spark在创建一个Stage之前,必须知道该Stage需要从多少个Partition读入数据,据此来创建Task数。源码Stage:

private[spark] class stage(
    val id:Int //stage的序号越大,数值越大
    val rdd: RDD[_], //归属于本stage的最后一个rdd
    val numTasks:Int, //创建的Task的数目,等于父rdd的输出Partition数目
    
    val shuffleDep:Option[ShuffleDependency[_,_,_]],//是否存在shuffle
    val parents:List[Stage],//父stage列表
    val jobId:Int,//作业id
    val callSite:CallSite)

Stage的划分的重要依据就在于是否有Shuffle操作,既宽依赖(RDD的宽依赖和窄依赖请参考前文,或者百度- -),如果有,则创建一个新的stage.Stage的划分完毕就明确了很多内容了,如下:

(1)产生的stage需要从多少个Partition中读取数据

(2)产生的stage会生成多少个Partition

(3)产生的stage是否属于shuffle

当确认了有多少个Partition,其实就确认了有多少个task。

当作业提交及执行期间,Spark集群中存在大量的消息的交互,所以使用AKKA 进行消息的接收,消息的处理和消息的发送。

下面开始在各个Executor中执行Task。然而Task又被分为ShuffleMapTask和ResultTask两种,相当于Hadoop的Map和Reduce.每个Stage根据isShuffleMap来标记确定Task类型,来区分ShuffleMapTask和ResultTask.一旦task类型和数量确定,下来就分发到各个executor,由Executor启动县城来执行。(从计划到执行)

TaskschedulerImple发送ReviveOffers消息给DriverActor,DriverActor在收到ReviveOffers消息后,调用makeOffers函数进行处理。源码如下:

def makeOffers(){
    launchTasks(scheduler.resourceOffers(
    executorHost.toArray.map{case(id,host)=>new WorkerOffer(id,host,freeCores(id))}))

makeOffers函数主要用来找寻空闲的Executor,随机分发,尽可能的将任务平摊到各个executor中。发现有空闲的Executor,将任务列表中的部分任务利用launchTasks发送给制定的Executor.Task执行完毕.

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏行者悟空

Spark DAG调度

16230
来自专栏码匠的流水账

聊聊jdbc的batch操作

statement的batch操作,可以批量进行insert或update操作,提升操作性能,特别是在大数据量的insert或update的时候。

12820
来自专栏牛肉圆粉不加葱

Spark Shuffle 模块② - Hash Based Shuffle write

Spark 最开始只有 Hash Based Shuffle,因为在很多场景中并不需要排序,在这些场景中多余的排序反而会损耗性能。

9410
来自专栏Spark生态圈

[spark] RDD缓存源码解析

我们可以利用不同的存储级别存储每一个被持久化的RDD。可以存储在内存中,也可以序列化后存储在磁盘上等方式。Spark也会自动持久化一些shuffle操作(如re...

24830
来自专栏Albert陈凯

2018-11-07 Spark应用程序开发参数调优深入剖析-Spark商业调优实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归作者(秦凯新)所有...

12640
来自专栏Albert陈凯

Spark系列课程-00xxSpark RDD持久化

我们这节课讲一下RDD的持久化 ? RDD的持久化 这段代码我们上午已经看过了,有瑕疵大家看出来了吗? 有什么瑕疵啊? 大家是否还记得我在第二节课的时候跟大...

41480
来自专栏木东居士的专栏

用MPI进行分布式内存编程(入门篇)

47530
来自专栏Albert陈凯

3.4 RDD的计算

3.4 RDD的计算 3.4.1 Ta s k简介 原始的RDD经过一系列转换后,会在最后一个RDD上触发一个动作,这个动作会生成一个Job。在Job被划分为...

371100
来自专栏LhWorld哥陪你聊算法

【Spark篇】---Spark中控制算子

Spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。

10630
来自专栏Albert陈凯

Spark详解06容错机制Cache 和 Checkpoint Cache 和 Checkpoint

Cache 和 Checkpoint 作为区别于 Hadoop 的一个重要 feature,cache 机制保证了需要访问重复数据的应用(如迭代型算法和交互式应...

501120

扫码关注云+社区

领取腾讯云代金券