📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!
pod 是 k8s 调度的最小单位,也就是整个 k8s 的基础之一,那么如何创建 pod 就是我们今天的关键了。这也是为什么我将它放在第一章的原因。
想看 k8s 源码,我不知如何下手,肯定是挑最熟悉最基础的部分,pod 肯定就是其中之一。而且日常的使用也让我们更熟悉 pod 的生命周期,所以我准备从 pod 入手。那么我知道 kubelet
作为操作 pod 的关键,那肯定就是代码的重点。于是我直接在代码中搜 kubelet
,找到对应文件名称为 kubelet
的文件,应该就是我们今天的目标了。
pkg/kubelet/kubelet.go
然后开始聚焦,由于源码很多,不可能面面俱到,所以一开始我们就要设定范围,看什么,不看什么。而我们今天的目标就是 pod 的创建 其他都和我们没有关系。所以,kubelet
本身的初始化等其他细节我们看到就略过。
看源码之前都自己先提出一些问题,这些问题能帮助我们更快的进入状态,以便能快速定位到所需的关键。
由于是第一篇,我就把详细的寻找过程也写进来,给小白提供思路。可略过。
pkg/kubelet/kubelet.go
shift + command + -
折叠所有方法我第一眼发下的三个方法:
HandlePodAdditions
HandlePodUpdates
HandlePodRemoves
显然这些方法是用于操作 pod 的,再看了一眼注释没错,那我们先看 HandlePodAdditions
。
通过 IDE command+点击方法名称可以看到哪里调用了这个方法
我习惯先看向上的链路,也就是是谁调用的这个方法,整个链路很清晰:
Run -> syncLoop -> syncLoopIteration -> HandlePodAdditions
然后简单查看一下 syncLoop
,从这里我们就可以理解到,kubelet 本质处理模式就是事件循环处理。启动之后通过一个 syncLoop
来不断循环处理过来的事件,在 syncLoopIteration
中根据不同的事件类型通过不同的方法处理事件,从而完成对 pod 的操作。下面的代码就描述了对于不同事件的处理:
// pkg/kubelet/kubelet.go:2387
switch u.Op {
case kubetypes.ADD:
klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// After restarting, kubelet will get all existing pods through
// ADD as if they are new pods. These pods will then go through the
// admission process and *may* be rejected. This can be resolved
// once we have checkpointing.
handler.HandlePodAdditions(u.Pods)
case kubetypes.UPDATE:
klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
klog.V(2).InfoS("SyncLoop REMOVE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
klog.V(4).InfoS("SyncLoop RECONCILE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
klog.V(2).InfoS("SyncLoop DELETE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
klog.ErrorS(nil, "Kubelet does not support snapshot update")
default:
klog.ErrorS(nil, "Invalid operation type received", "operation", u.Op)
}
然后再看方法的内部,精简后,主要逻辑就是下面这样:
// pkg/kubelet/kubelet.go:2506
for _, pod := range pods {
kl.podManager.AddPod(pod)
kl.podWorkers.UpdatePod(UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: kubetypes.SyncPodCreate,
StartTime: start,
})
}
在 podManager
中添加,在 podWorkers
中更新。也就是 kubelet
有两个帮手:podManager
和 podWorkers
。
那么接下来的 UpdatePod
就“有你好看”了,通常第一次看源码容易迷失的大多数原因就来源于大量的代码被吓怕了。还是那句话,我是来看 pod 如何创建的。所以其他的什么 if 判断全部都可以扔掉,因为它们都是在处理 pod 的其他状态,对于创建无关。
// pkg/kubelet/pod_workers.go:926
// start the pod worker goroutine if it doesn't exist
podUpdates, exists := p.podUpdates[uid]
if !exists {
// spawn a pod worker
go func() {
p.podWorkerLoop(uid, outCh)
}()
}
// notify the pod worker there is a pending update
status.pendingUpdate = &options
status.working = true
klog.V(4).InfoS("Notifying pod of pending update", "pod", klog.KRef(ns, name), "podUID", uid, "workType", status.WorkType())
select {
case podUpdates <- struct{}{}:
default:
}
根据这样的流程,你可以按照下面的路径开始理解和寻觅:
pkg/kubelet/pod_workers.go:1213
pkg/kubelet/pod_workers.go:1285
pkg/kubelet/kubelet.go:1687
pkg/kubelet/kubelet.go:1934
pkg/kubelet/kuberuntime/kuberuntime_container.go:177
// SyncPod syncs the running pod into the desired pod by executing following steps://
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create ephemeral containers.
// 6. Create init containers.
// 7. Resize running containers (if InPlacePodVerticalScaling==true)
// 8. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(
SyncPod
的注释写的很清楚,步骤 123… ,这就是我们所说的 pod 的创建过程,有关 sandbox 我们稍后文章再说,你可以简单理解为这里在创建 pod 所需要的环境。其中我们关注到两个步骤:
而创建容器的方法是 startContainer
:
// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(
同样的,注释步骤很清晰,就是拉取镜像、创建镜像、启动,而这些最终的操作都落到了 ContainerManager
具体会在 vendor/k8s.io/cri-api/pkg/apis/services.go:34
也就是我们常说的 CRI
了。
kubelet
怎么知道要创建 pod 的?syncLoop
中有 updates chan
这个通道传递了 kubetypes.PodUpdate
事件,有事件(创建的事件)来的时候就会创建 pod。kubelet
本身去操作 CRI 的吗?还是有别人的帮助?kubelet
有 kubeGenericRuntimeManager
其中有 RuntimeService
也就是 ContainerManager
也就是最终得 CRI。下面这些,这些就是不看源码所很难了解到的内部细节了,虽然不影响整体理解,但可以作为额外扩展来学习一下。
在
HandlePodAdditions
创建的过程中出现了一个方法是:GetPodAndMirrorPod
,那么什么是MirrorPod
呢?
kubelet
会为每个 静态 pod 创建一个 MirrorPod
,而静态 pod 直接由 kubelet
管理,而不交给 apiserver
。如果 静态 pod 出现 crashes 那么 kubelet
会直接重启。而通过 kubectl get pod
看到的就是 MirrorPod
。其主要功能还是为了k8s 内部一些实现和操作方便,所以我们平常不需要知道它,也就是为什么看源码才知道,平常不知道的原因。
Static Pod 生命是 kubelet 控制的 普通的 Pod 生命是 control plane 控制的 那怎么让 APIserver 能看到它呢?答案就是 MirrorPod
如果还是不理解,我总结的不一定完整,建议看原文的参考文档:
事件循环,这是一个非常常见的设计,就是如同 kubelet
一样,通过 syncLoop
来不断循环来读取事件,通过不同类型的事件来执行对应操作。这样的优点就是解耦,并且职责清晰,还能通过事件类型来不断地扩展相对应的功能。当然其中配合 go 中 channel 和 select 写起来更加舒适。
小细节,我觉得缺少函数式编程的耳濡目染很难写出这样的代码:在 SyncPod
方法中,启动容器被封装成了一个内置函数 start
通过这样来共享了 pod 的相关配置,如果抽离成一个新的函数,参数会很多,又要封装新的对象,不划算。故,这样的写法,值得学习和感染一下。
# pkg/kubelet/kuberuntime/kuberuntime_manager.go:1225
start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {