前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark作业调度

Spark作业调度

作者头像
用户3003813
发布2018-09-06 13:12:26
7080
发布2018-09-06 13:12:26
举报
文章被收录于专栏:个人分享个人分享

    Spark在任务提交时,主要存在于Driver和Executor的两个节点.

(1)Driver的作用: 用于将所有要处理的RDD的操作转化为DAG,并且根据RDD DAG将JBO分割为多个Stage,最后生成相应的task,分发到各个Executor执行.

流程:sc.runJob -> DAGScheduler.runJob ->submitJob ->DAGEventProcessActor ->dagScheduler.handleJobSubmitted ->submitStage ->submitMissingTasks ->taskScheduler.submitTasks -> schedulerBackend.reviveOffers ->ReviveOffers ->DriverActor ->makeOffers -> resourceOffers ->launchTasks ->CoarseGrainedExecutorBackend(Executor)

其中handleJobSubmitted和submitStage主要负责依赖性分析,生成finalStage,根据finalStage来生成job.

源码newStage用来创建一个新的Stage

代码语言:javascript
复制
private def newStage(
        rdd:RDD[],
        numTasks: Int,
        shuffleDep: Option[ShuffleDependency[_,_,_]],
        jobId:Int,
        callSite:CallSite)
    :stage =
    {
        val id = nextStageId.getAndIncrement()
        val stage = new Stage(id,rdd,numTasks,shuffleDep,getParentStages(rdd,jobId),jobId,callSite)
        stageIdToStage(id) = stage 
        updateJobIdStageIdMaps(jobId,stage)
        stageToInfos(stage) = StageInfo.fromStage(stage)
        stage
}

spark在创建一个Stage之前,必须知道该Stage需要从多少个Partition读入数据,据此来创建Task数。源码Stage:

代码语言:javascript
复制
private[spark] class stage(
    val id:Int //stage的序号越大,数值越大
    val rdd: RDD[_], //归属于本stage的最后一个rdd
    val numTasks:Int, //创建的Task的数目,等于父rdd的输出Partition数目
    
    val shuffleDep:Option[ShuffleDependency[_,_,_]],//是否存在shuffle
    val parents:List[Stage],//父stage列表
    val jobId:Int,//作业id
    val callSite:CallSite)

Stage的划分的重要依据就在于是否有Shuffle操作,既宽依赖(RDD的宽依赖和窄依赖请参考前文,或者百度- -),如果有,则创建一个新的stage.Stage的划分完毕就明确了很多内容了,如下:

(1)产生的stage需要从多少个Partition中读取数据

(2)产生的stage会生成多少个Partition

(3)产生的stage是否属于shuffle

当确认了有多少个Partition,其实就确认了有多少个task。

当作业提交及执行期间,Spark集群中存在大量的消息的交互,所以使用AKKA 进行消息的接收,消息的处理和消息的发送。

下面开始在各个Executor中执行Task。然而Task又被分为ShuffleMapTask和ResultTask两种,相当于Hadoop的Map和Reduce.每个Stage根据isShuffleMap来标记确定Task类型,来区分ShuffleMapTask和ResultTask.一旦task类型和数量确定,下来就分发到各个executor,由Executor启动县城来执行。(从计划到执行)

TaskschedulerImple发送ReviveOffers消息给DriverActor,DriverActor在收到ReviveOffers消息后,调用makeOffers函数进行处理。源码如下:

代码语言:javascript
复制
def makeOffers(){
    launchTasks(scheduler.resourceOffers(
    executorHost.toArray.map{case(id,host)=>new WorkerOffer(id,host,freeCores(id))}))

makeOffers函数主要用来找寻空闲的Executor,随机分发,尽可能的将任务平摊到各个executor中。发现有空闲的Executor,将任务列表中的部分任务利用launchTasks发送给制定的Executor.Task执行完毕.

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2015-10-23 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档