前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Executor源码分析

Executor源码分析

原创
作者头像
幽鸿
发布2020-05-06 19:26:12
3890
发布2020-05-06 19:26:12
举报

Executor源码的最主要代码是TaskRunner,TaskRunner是一个多线程,首先看其runner方法:

代码语言:javascript
复制
//对序列化的task数据,进行反序列化
Executor.taskDeserializationProps.set(taskDescription.properties)
//通过网络通信,将需要的文件、资源、jar拷贝过来
updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)

1、CoarseGrainedExecutorBackend

worker中为application启动的executor,实际上是启动了CoarseGrainedExecutorBackend进程,CoarseGrainedExecutorBackend的启动方法:

代码语言:javascript
复制
override def onStart() {
  logInfo("Connecting to driver: " + driverUrl)
  rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    driver = Some(ref)
    ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
  }(ThreadUtils.sameThread).onComplete {
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    case Success(msg) =>
      // Always receive `true`. Just ignore it
    case Failure(e) =>
      exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
  }(ThreadUtils.sameThread)
}

2、Executor注册机制

在CoarseGrainedExecutorBackend中调用注册RegisterExecutor方法

3、启动task机制

代码语言:javascript
复制
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
  val tr = new TaskRunner(context, taskDescription)
  runningTasks.put(taskDescription.taskId, tr)
  threadPool.execute(tr)
}

在Executor中launchTask,是调用TaskRunner多线程的。TaskRunner是采用java实现的多线程,并且这里使用了java线程池ThreadPool来管理task.

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档