Executor源码的最主要代码是TaskRunner,TaskRunner是一个多线程,首先看其runner方法:
//对序列化的task数据,进行反序列化
Executor.taskDeserializationProps.set(taskDescription.properties)
//通过网络通信,将需要的文件、资源、jar拷贝过来
updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
1、CoarseGrainedExecutorBackend
worker中为application启动的executor,实际上是启动了CoarseGrainedExecutorBackend进程,CoarseGrainedExecutorBackend的启动方法:
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
2、Executor注册机制
在CoarseGrainedExecutorBackend中调用注册RegisterExecutor方法
3、启动task机制
def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
val tr = new TaskRunner(context, taskDescription)
runningTasks.put(taskDescription.taskId, tr)
threadPool.execute(tr)
}
在Executor中launchTask,是调用TaskRunner多线程的。TaskRunner是采用java实现的多线程,并且这里使用了java线程池ThreadPool来管理task.
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。