toc
从整个spark作业执行流程来看,作业调度分为:
由于stage的划分和调度是spark作业逻辑层面上的事,不涉及到物理集群资源,我们不需要关心。
无论是yarn还是k8s,作为计算资源提供方,我们关注的是对他们提供的底层计算资源的分配(Executor)和使用(Task)。spark的k8s模块的主要工作就是管理executor pod的数量和生命周期,并在活跃的pod上发起任务调度。
这里说的"发起",是因为所有的任务调度都由TaskSchedulerImpl来确定,spark-k8s调度模块(yarn同理)只是在适当的时机去发起调用,Taskscheduler最终决定将具体的任务调度到具体的Executor(Pod)。换句话说,TaskScheduler也是通用的。
总结下,spark k8s调度模块要做的事情:
整个调度系统采用"发布-订阅"、生产者-消费者模式,可以类比kafka等消息系统。按模块交互流程,可以分为:
下面是存储模块类ExecutorPodsSnapshotsStoreImpl
说明。
Controls the propagation of the Spark application's executor pods state to subscribers that
react to that state.
Roughly follows a producer-consumer model. Producers report states of executor pods, and these
states are then published to consumers that can perform any actions in response to these states.
Producers push updates in one of two ways. An incremental update sent by updatePod() represents
a known new state of a single executor pod. A full sync sent by replaceSnapshot() indicates that
the passed pods are all of the most up to date states of all executor pods for the application.
The combination of the states of all executor pods for the application is collectively known as
a snapshot. The store keeps track of the most up to date snapshot, and applies updates to that
most recent snapshot - either by incrementally updating the snapshot with a single new pod state,
or by replacing the snapshot entirely on a full sync.
Consumers, or subscribers, register that they want to be informed about all snapshots of the
executor pods. Every time the store replaces its most up to date snapshot from either an
incremental update or a full sync, the most recent snapshot after the update is posted to the
subscriber's buffer. Subscribers receive blocks of snapshots produced by the producers in
time-windowed chunks. Each subscriber can choose to receive their snapshot chunks at different
time intervals.
The subscriber notification callback is guaranteed to be called from a single thread at a time.
按照模块顺序,说明每个模块的类及作用。
生产者就是从k8s中不断获取ExecutorPod状态,并上报给存储模块。熟悉k8s client的话,都知道client的list-watch机制,简单说采用定时全量同步+实时增量相结合的方式监听k8s资源状态。这里的生产者也类似,有增量同步和全量同步两种:
snapshotsStore.updatePod
,代码片段#注册监听pod事件
watchConnection = kubernetesClient.pods()
.withLabel(SPARK\\\\_APP\\\\_ID\\\\_LABEL, applicationId)
.withLabel(SPARK\\\\_ROLE\\\\_LABEL, SPARK\\\\_POD\\\\_EXECUTOR\\\\_ROLE)
.watch(new ExecutorPodsWatcher())
#事件处理:调用snapshotsStore更新
override def eventReceived(action: Action, pod: Pod): Unit = {
val podName = pod.getMetadata.getName
logDebug(s"Received executor pod update for pod named $podName, action $action")
snapshotsStore.updatePod(pod)
}
snapshotsStore.replaceSnapshot
#独立线程定时调度全量获取pod状态 def start(applicationId: String): Unit = {
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
}
#runnable逻辑
private class PollRunnable(applicationId: String) extends Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
logDebug(s"Resynchronizing full executor pod state from Kubernetes.")
snapshotsStore.replaceSnapshot(kubernetesClient
.pods()
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withoutLabel(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
.list()
.getItems
.asScala.toSeq)
}
}
存储模块(ExecutorPodsSnapshotsStoreImpl)主要功能:
private[spark] trait ExecutorPodsSnapshotsStore {
def addSubscriber
(processBatchIntervalMillis: Long)
(onNewSnapshots: Seq[ExecutorPodsSnapshot] => Unit): Unit
def stop(): Unit
def notifySubscribers(): Unit
def updatePod(updatedPod: Pod): Unit
def replaceSnapshot(newSnapshot: Seq[Pod]): Unit
}
从任务调度的功能角度来看,主要有:
主要实现类:ExecutorPodsAllocator,实现方法是onNewSnapshots。当pod数量不够时会根据差额创建pod,相反则删除pod。代码片段如下:#删除多余Executor的逻辑
if (knownPodCount > targetNum) {
val excess = knownPodCount - targetNum
val knownPendingToDelete = currentPendingExecutors
.filter(x => isExecutorIdleTimedOut(x.\_2, currentTime))
.map { case (id, \_) => id }
.take(excess - newlyCreatedExecutorsForRpId.size)
val toDelete = newlyCreatedExecutorsForRpId
.filter { case (\_, (\_, createTime)) =>
currentTime - createTime > executorIdleTimeout
}.keys.take(excess).toList ++ knownPendingToDelete
if (toDelete.nonEmpty) {
logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).")
_deletedExecutorIds = _deletedExecutorIds ++ toDelete
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
.delete()
newlyCreatedExecutors --= toDelete
knownPendingCount -= knownPendingToDelete.size
}
}
}
#创建ExecutorPod的逻辑
for ( _ <- 0 until numExecutorsToAllocate) {
val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet()
val executorConf = KubernetesConf.createExecutorConf(
conf,
newExecutorId.toString,
applicationId,
driverPod,
resourceProfileId)
val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr,
kubernetesClient, rpIdToResourceProfile(resourceProfileId))
val executorPod = resolvedExecutorSpec.pod
val podWithAttachedContainer = new PodBuilder(executorPod.pod)
.editOrNewSpec()
.addToContainers(executorPod.container)
.endSpec()
.build()
val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
实现类:ExecutorPodsLifecycleManager。在onNewSnapshots方法中主要针对deleted、failed、succeeded以及inactive状态的pod进行处理:
i. removeExecutorFromSpark:从driver的数据结构中剔除pod
schedulerBackend.doRemoveExecutor(execId.toString, exitReason)
driverEndpoint.send(RemoveExecutor(executorId, reason))
ii. removeExecutorFromK8s:从k8s中删除pod
kubernetesClient
.pods()
.withName(updatedPod.getMetadata.getName)
.delete()
实现类KubernetesClusterManager。
主要方法:
#创建SchedulerBackend
createSchedulerBackend(){
new KubernetesClusterSchedulerBackend(
scheduler.asInstanceOf[TaskSchedulerImpl],
sc,
kubernetesClient,
schedulerExecutorService,
snapshotsStore,
executorPodsAllocator,
executorPodsLifecycleEventHandler,
podsWatchEventSource,
podsPollingEventSource)
}
#初始化TaskScheduler,当taskScheduler运行起来后,这套消息系统也就自动运转起来
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}
KubernetesClusterManager new出了消息系统所有的对象,包括KubernetesClusterSchedulerBackend。KubernetesClusterSchedulerBackend则是启动了整个消息系统。如下:
override def start(): Unit = {
super.start()
val initExecs = Map(defaultProfile -> initialExecutors)
podAllocator.setTotalExpectedExecutors(initExecs)
lifecycleEventHandler.start(this)
podAllocator.start(applicationId())
watchEvents.start(applicationId())
pollEvents.start(applicationId())
setUpExecutorConfigMap()
}
start方法的逻辑:首先生成ResourceProfile的默认实例defaultProfile,包括task的资源需求和executor的(cpu,memory,overheadMemory)等资源需求,并根据参数确定defaultProfile需要的executor数量。然后把defaultProfile -> initialExecutors 交给PodAllocator,这样podAllocator就可以根据消息系统当前的pod数量,来确定是否需要启动或删除一些ExecutorPod。
另外,start方法还会把SPARK_CONF_DIR目录下的配置文件以configmap的形式提供给ExecutorPods。
KubernetesClusterSchedulerBackend继承CoarseGrainedSchedulerBackend,CoarseGrainedSchedulerBackend实现了两个接口:
总结来说,CoarseGrainedSchedulerBackend实现了spark作业层面的抽象概念Task的调度管理,以及计算资源层面的Executor的调度管理。而具体的ClusterManager如K8s,则在Executor的物化层面做了实现,如k8s的增加Executor实际是用ExecugtorPodAllocator去创建对应的pod,而删除Executor直接调用K8s客户端删除Pod。yarn也类似。
任务在driver中从诞生到最终发送的过程,主要有一下几个步骤:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。