转载请声明出处哦~,本篇文章发布于luozhiyun的博客:https://www.luozhiyun.com,源码版本是1.19
又是一个周末,可以愉快的坐下来静静的品味一段源码,这一篇涉及到资源的回收,工作量是很大的,篇幅会比较长,我们可以看到k8s在资源不够时会怎么做的,k8s在回收资源的时候有哪些考虑,我们的pod为什么会无端端的被干掉等等。
在k8s中,CPU和内存的资源主要是通过这limit&request来进行限制的,在yaml文件中的定义如下:
spec.containers[].resources.limits.cpu
spec.containers[].resources.limits.memory
spec.containers[].resources.requests.cpu
spec.containers[].resources.requests.memory
在调度的时候,kube-scheduler 只会按照 requests 的值进行计算,而真正限制资源使用的是limit。
下面我引用一个官方的例子:
apiVersion: v1
kind: Pod
metadata:
name: cpu-demo
namespace: cpu-example
spec:
containers:
- name: cpu-demo-ctr
image: vish/stress
resources:
limits:
cpu: "1"
requests:
cpu: "0.5"
args:
- -cpus
- "2"
在这个例子中,args参数给的是cpus等于2,表示这个container可以使用2个cpu进行压测。但是我们的limits是1,以及requests是0.5。
当我们创建好这个pod之后,然后使用kubectl top去查看资源使用情况的时候会发现cpu使用并不会超过1:
NAME CPU(cores) MEMORY(bytes)
cpu-demo 974m <something>
这说明这个pod的cpu资源被限制在了1个cpu,即使container想使用,也是没有办法的。
在容器没有指定 request 的时候,request 的值和 limit 默认相等。
下面说一下由不同的 requests 和 limits 的设置方式引出的不同的 QoS 级别。
kubernetes 中有三种 Qos,分别为:
Guaranteed
:Pod中所有Container的所有Resource的limit
和request
都相等且不为0;
Burstable
:pod不满足Guaranteed条件,但是其中至少有一个container设置了requests或limits ;
BestEffort
:pod的 requests 与 limits 均没有设置;
当宿主机资源紧张的时候,kubelet 对 Pod 进行 Eviction(即资源回收)时会按照Qos的顺序进行回收,回收顺序是:BestEffort>Burstable>Guaranteed
Eviction有两种模式,分为 Soft 和 Hard。Soft Eviction 允许你为 Eviction 过程设置grace period,然后等待一个用户配置的grace period之后,再执行Eviction,而Hard则立即执行。
那么什么时候会发生Eviction呢?我们可以为Eviction 设置threshold,比如设置设定内存的 eviction hard threshold 为 100M,那么当这台机器的内存可用资源不足 100M 时,kubelet 就会根据这台机器上面所有 pod 的 QoS 级别以及他们的内存使用情况,进行一个综合排名,把排名最靠前的 pod 进行迁移,从而释放出足够的内存资源。
thresholds定义方式为[eviction-signal][operator][quantity]
eviction-signal
eviction-signal按照官方文档的说法分为如下几种:
Eviction Signal | Description |
---|---|
memory.available | memory.available := node.status.capacity[memory] - node.stats.memory.workingSet |
nodefs.available | nodefs.available := node.stats.fs.available |
nodefs.inodesFree | nodefs.inodesFree := node.stats.fs.inodesFree |
imagefs.available | imagefs.available := node.stats.runtime.imagefs.available |
imagefs.inodesFree | imagefs.inodesFree := node.stats.runtime.imagefs.inodesFree |
nodefs和imagefs表示两种文件系统分区:
nodefs:文件系统,kubelet 将其用于卷和守护程序日志等。
imagefs:文件系统,容器运行时用于保存镜像和容器可写层。
operator
就是所需的关系运算符,如"<"。
quantity
是阈值的大小,可以容量大小,如:1Gi;也可以用百分比来表示:10%。
如果kubelet在节点经历系统 OOM 之前无法回收内存,那么oom_killer将基于它在节点上 使用的内存百分比算出一个oom_score,然后结束得分最高的容器。
qos的代码位于pkg\apis\core\v1\helper\qos\包下面:
qos#GetPodQOS
//pkg\apis\core\v1\helper\qos\qos.go
func GetPodQOS(pod *v1.Pod) v1.PodQOSClass {
requests := v1.ResourceList{}
limits := v1.ResourceList{}
zeroQuantity := resource.MustParse("0")
isGuaranteed := true
allContainers := []v1.Container{}
//追加所有的初始化容器
allContainers = append(allContainers, pod.Spec.Containers...)
allContainers = append(allContainers, pod.Spec.InitContainers...)
//遍历container
for _, container := range allContainers {
// process requests
//遍历request 里面的cpu、memory 获取其中的值
for name, quantity := range container.Resources.Requests {
if !isSupportedQoSComputeResource(name) {
continue
}
if quantity.Cmp(zeroQuantity) == 1 {
delta := quantity.DeepCopy()
if _, exists := requests[name]; !exists {
requests[name] = delta
} else {
delta.Add(requests[name])
requests[name] = delta
}
}
}
// process limits
qosLimitsFound := sets.NewString()
//遍历 limit 里面的cpu、memory 获取其中的值
for name, quantity := range container.Resources.Limits {
if !isSupportedQoSComputeResource(name) {
continue
}
if quantity.Cmp(zeroQuantity) == 1 {
qosLimitsFound.Insert(string(name))
delta := quantity.DeepCopy()
if _, exists := limits[name]; !exists {
limits[name] = delta
} else {
delta.Add(limits[name])
limits[name] = delta
}
}
}
//如果limits 没有同时设置cpu 、Memory,那么就不是Guaranteed
if !qosLimitsFound.HasAll(string(v1.ResourceMemory), string(v1.ResourceCPU)) {
isGuaranteed = false
}
}
//如果requests 和 limits都没有设置,那么为BestEffort
if len(requests) == 0 && len(limits) == 0 {
return v1.PodQOSBestEffort
}
// Check is requests match limits for all resources.
if isGuaranteed {
for name, req := range requests {
if lim, exists := limits[name]; !exists || lim.Cmp(req) != 0 {
isGuaranteed = false
break
}
}
}
// 都设置了limits 和 requests,则是Guaranteed
if isGuaranteed &&
len(requests) == len(limits) {
return v1.PodQOSGuaranteed
}
return v1.PodQOSBurstable
}
上面有注释我就不过多介绍,非常的简单。
下面这里是QOS OOM打分机制,通过给不同的pod打分来判断,哪些pod可以被优先kill掉,分数越高的越容易被kill。
policy
//\pkg\kubelet\qos\policy.go
// 分值越高越容易被kill
const (
// KubeletOOMScoreAdj is the OOM score adjustment for Kubelet
KubeletOOMScoreAdj int = -999
// KubeProxyOOMScoreAdj is the OOM score adjustment for kube-proxy
KubeProxyOOMScoreAdj int = -999
guaranteedOOMScoreAdj int = -998
besteffortOOMScoreAdj int = 1000
)
policy#GetContainerOOMScoreAdjust
//\pkg\kubelet\qos\policy.go
func GetContainerOOMScoreAdjust(pod *v1.Pod, container *v1.Container, memoryCapacity int64) int {
//静态Pod、镜像Pod和高优先级Pod,直接可以是guaranteedOOMScoreAdj
if types.IsCriticalPod(pod) {
// Critical pods should be the last to get killed.
return guaranteedOOMScoreAdj
}
//获取pod的qos等级,这里只处理Guaranteed与BestEffort
switch v1qos.GetPodQOS(pod) {
case v1.PodQOSGuaranteed:
// Guaranteed containers should be the last to get killed.
return guaranteedOOMScoreAdj
case v1.PodQOSBestEffort:
return besteffortOOMScoreAdj
}
memoryRequest := container.Resources.Requests.Memory().Value()
//如果我们占用的内存越少,则打分就越高
oomScoreAdjust := 1000 - (1000*memoryRequest)/memoryCapacity
//这里是为了保证burstable能有个更高的 OOM score
if int(oomScoreAdjust) < (1000 + guaranteedOOMScoreAdj) {
return (1000 + guaranteedOOMScoreAdj)
}
if int(oomScoreAdjust) == besteffortOOMScoreAdj {
return int(oomScoreAdjust - 1)
}
return int(oomScoreAdjust)
}
这个方法里面给不同的pod进行打分,静态Pod、镜像Pod和高优先级Pod,QOS直接被设置成为guaranteed;
然后调用qos的GetPodQOS方法获取一个pod的评分,但是如果一个pod是burstable,那么需要根据其直接使用的内存来进行评分,占用的内存越少,则打分就越高,如果分数小于1000 + guaranteedOOMScoreAdj,也就是2分,那么被直接设置成2分,避免分数过低。
kubelet在实例化一个kubelet对象的时候,调用eviction.NewManager
新建了一个evictionManager对象。然后kubelet再Run方法开始工作的时候,创建一个goroutine,每5s执行一次updateRuntimeUp。
在updateRuntimeUp中,待确认runtime启动成功后,会调用initializeRuntimeDependentModules完成runtime依赖模块的初始化工作。
然后在initializeRuntimeDependentModules中会调用evictionManager的start方法进行启动。
代码如下,具体的kubelet流程我们留到以后慢慢分析:
func NewMainKubelet(...){
...
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock, etcHostsPathFunc)
klet.evictionManager = evictionManager
...
}
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
...
}
func (kl *Kubelet) updateRuntimeUp() {
...
kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
...
}
func (kl *Kubelet) initializeRuntimeDependentModules() {
...
kl.evictionManager.Start(kl.StatsProvider, kl.GetActivePods, kl.podResourcesAreReclaimed, evictionMonitoringPeriod)
...
}
下面我们来到\pkg\kubelet\eviction\eviction_manager.go去看一下Start方法怎么实现eviction的。
managerImp#Start
// 开启一个控制循环去监视和响应资源过低的情况
func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, monitoringInterval time.Duration) {
thresholdHandler := func(message string) {
klog.Infof(message)
m.synchronize(diskInfoProvider, podFunc)
}
//是否要利用kernel memcg notification
if m.config.KernelMemcgNotification {
for _, threshold := range m.config.Thresholds {
if threshold.Signal == evictionapi.SignalMemoryAvailable || threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable {
notifier, err := NewMemoryThresholdNotifier(threshold, m.config.PodCgroupRoot, &CgroupNotifierFactory{}, thresholdHandler)
if err != nil {
klog.Warningf("eviction manager: failed to create memory threshold notifier: %v", err)
} else {
go notifier.Start()
m.thresholdNotifiers = append(m.thresholdNotifiers, notifier)
}
}
}
}
// start the eviction manager monitoring
// 启动一个goroutine,for循环里每隔monitoringInterval(10s)执行一次synchronize
go func() {
for {
//synchronize是主要的eviction控制循环,返回被kill的pod,或返回nill
if evictedPods := m.synchronize(diskInfoProvider, podFunc); evictedPods != nil {
klog.Infof("eviction manager: pods %s evicted, waiting for pod to be cleaned up", format.Pods(evictedPods))
m.waitForPodsCleanup(podCleanedUpFunc, evictedPods)
} else {
time.Sleep(monitoringInterval)
}
}
}()
}
下面的synchronize方法会很长,需要点耐心:
managerImpl#synchronize
1.根据上面介绍的不同的eviction signal会有不同的排序方法,以及设置节点资源回收方法
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
if m.dedicatedImageFs == nil {
hasImageFs, ok := diskInfoProvider.HasDedicatedImageFs()
if ok != nil {
return nil
}
m.dedicatedImageFs = &hasImageFs
//注册各个eviction signal所对应的资源排序方法
m.signalToRankFunc = buildSignalToRankFunc(hasImageFs)
// 注册节点资源回收方法,例如imagefs.avaliable对应的是删除无用容器和无用镜像
m.signalToNodeReclaimFuncs = buildSignalToNodeReclaimFuncs(m.imageGC, m.containerGC, hasImageFs)
}
...
}
看一下buildSignalToRankFunc方法的实现:
Copy
func buildSignalToRankFunc(withImageFs bool) map[evictionapi.Signal]rankFunc {
signalToRankFunc := map[evictionapi.Signal]rankFunc{
evictionapi.SignalMemoryAvailable: rankMemoryPressure,
evictionapi.SignalAllocatableMemoryAvailable: rankMemoryPressure,
evictionapi.SignalPIDAvailable: rankPIDPressure,
}
if withImageFs {
signalToRankFunc[evictionapi.SignalNodeFsAvailable] = rankDiskPressureFunc([]fsStatsType{fsStatsLogs, fsStatsLocalVolumeSource}, v1.ResourceEphemeralStorage)
signalToRankFunc[evictionapi.SignalNodeFsInodesFree] = rankDiskPressureFunc([]fsStatsType{fsStatsLogs, fsStatsLocalVolumeSource}, resourceInodes)
signalToRankFunc[evictionapi.SignalImageFsAvailable] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot}, v1.ResourceEphemeralStorage)
signalToRankFunc[evictionapi.SignalImageFsInodesFree] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot}, resourceInodes)
} else {
signalToRankFunc[evictionapi.SignalNodeFsAvailable] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}, v1.ResourceEphemeralStorage)
signalToRankFunc[evictionapi.SignalNodeFsInodesFree] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}, resourceInodes)
signalToRankFunc[evictionapi.SignalImageFsAvailable] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}, v1.ResourceEphemeralStorage)
signalToRankFunc[evictionapi.SignalImageFsInodesFree] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}, resourceInodes)
}
return signalToRankFunc
}
这个方法里面会将各个eviction signal的排序方法放入到一个map中返回,如MemoryAvailable、NodeFsAvailable、ImageFsAvailable等。
2.获取所有的活跃的pod,以及整体的stat信息
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
//获取当前active的pods
activePods := podFunc()
updateStats := true
//获取节点的整体概况,即nodeStsts和podStats
summary, err := m.summaryProvider.Get(updateStats)
if err != nil {
klog.Errorf("eviction manager: failed to get summary stats: %v", err)
return nil
}
//如果Notifiers有超过10s没有刷新,那么更新Notifiers
if m.clock.Since(m.thresholdsLastUpdated) > notifierRefreshInterval {
m.thresholdsLastUpdated = m.clock.Now()
for _, notifier := range m.thresholdNotifiers {
if err := notifier.UpdateThreshold(summary); err != nil {
klog.Warningf("eviction manager: failed to update %s: %v", notifier.Description(), err)
}
}
}
...
}
3.根据summary信息创建相应的统计信息到observations对象中
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
//根据summary信息创建相应的统计信息到observations对象中,如SignalMemoryAvailable、SignalNodeFsAvailable等。
observations, statsFunc := makeSignalObservations(summary)
...
}
下面抽取部分代码makeSignalObservations
Copy
func makeSignalObservations(summary *statsapi.Summary) (signalObservations, statsFunc) {
...
if memory := summary.Node.Memory; memory != nil && memory.AvailableBytes != nil && memory.WorkingSetBytes != nil {
result[evictionapi.SignalMemoryAvailable] = signalObservation{
available: resource.NewQuantity(int64(*memory.AvailableBytes), resource.BinarySI),
capacity: resource.NewQuantity(int64(*memory.AvailableBytes+*memory.WorkingSetBytes), resource.BinarySI),
time: memory.Time,
}
}
...
}
这个方法主要是将summary里面的资源利用情况根据不同的eviction signal封装到result里面返回。
4. 根据获取的observations判断是否已到达阈值的thresholds
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
//根据获取的observations判断是否已到达阈值的thresholds,然后返回
thresholds = thresholdsMet(thresholds, observations, false)
if len(m.thresholdsMet) > {
//Minimum eviction reclaim 策略
thresholdsNotYetResolved := thresholdsMet(m.thresholdsMet, observations, true)
thresholds = mergeThresholds(thresholds, thresholdsNotYetResolved)
}
...
}
thresholdsMet
Copy
func thresholdsMet(thresholds []evictionapi.Threshold, observations signalObservations, enforceMinReclaim bool) []evictionapi.Threshold {
results := []evictionapi.Threshold{}
for i := range thresholds {
threshold := thresholds[i]
observed, found := observations[threshold.Signal]
if !found {
klog.Warningf("eviction manager: no observation found for eviction signal %v", threshold.Signal)
continue
}
thresholdMet := false
// 根据资源容量获取阈值的资源大小
quantity := evictionapi.GetThresholdQuantity(threshold.Value, observed.capacity)
//Minimum eviction reclaim 策略,具体看:https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/#minimum-eviction-reclaim
if enforceMinReclaim && threshold.MinReclaim != nil {
quantity.Add(*evictionapi.GetThresholdQuantity(*threshold.MinReclaim, observed.capacity))
}
//如果observed.available比quantity大,那么返回1
thresholdResult := quantity.Cmp(*observed.available)
//检查Operator标识符
switch threshold.Operator {
//如果是小于号"<",当thresholdResult大于0,返回true
case evictionapi.OpLessThan:
thresholdMet = thresholdResult >
}
//如果append到results,表示已经到达阈值
if thresholdMet {
results = append(results, threshold)
}
}
return results
}
thresholdsMet会遍历整个thresholds,然后从observations里面获取eviction signal对应的资源情况。因为我们上面讲了设置的threshold可以是1Gi,也可以是百分比,所以需要调用GetThresholdQuantity方法换算一下,得到quantity;
然后根据Minimum eviction reclaim 策略判断一下是否还需要提高这个需要eviction的资源,具体的信息查看文档:https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/#minimum-eviction-reclaim;
然后用quantity和available比较一下,如果已达阈值,那么加入到results集合中返回。
5.记录eviction signal 第一次的时间,并将Eviction Signals映射到对应的Node Conditions
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
now := m.clock.Now()
//主要用来记录 eviction signal 第一次的时间,没有则设置 now 时间
thresholdsFirstObservedAt := thresholdsFirstObservedAt(thresholds, m.thresholdsFirstObservedAt, now)
// the set of node conditions that are triggered by currently observed thresholds
// Kubelet会将对应的Eviction Signals映射到对应的Node Conditions
nodeConditions := nodeConditions(thresholds)
if len(nodeConditions) > {
klog.V().Infof("eviction manager: node conditions - observed: %v", nodeConditions)
}
...
}
nodeConditions
Copy
func nodeConditions(thresholds []evictionapi.Threshold) []v1.NodeConditionType {
results := []v1.NodeConditionType{}
for _, threshold := range thresholds {
if nodeCondition, found := signalToNodeCondition[threshold.Signal]; found {
//检查results里是否已有nodeCondition
if !hasNodeCondition(results, nodeCondition) {
results = append(results, nodeCondition)
}
}
}
return results
}
nodeConditions方法主要就是根据signalToNodeCondition来映射对应的nodeCondition,其中nodeCondition如下:
Copy
signalToNodeCondition = map[evictionapi.Signal]v1.NodeConditionType{}
signalToNodeCondition[evictionapi.SignalMemoryAvailable] = v1.NodeMemoryPressure
signalToNodeCondition[evictionapi.SignalAllocatableMemoryAvailable] = v1.NodeMemoryPressure
signalToNodeCondition[evictionapi.SignalImageFsAvailable] = v1.NodeDiskPressure
signalToNodeCondition[evictionapi.SignalNodeFsAvailable] = v1.NodeDiskPressure
signalToNodeCondition[evictionapi.SignalImageFsInodesFree] = v1.NodeDiskPressure
signalToNodeCondition[evictionapi.SignalNodeFsInodesFree] = v1.NodeDiskPressure
signalToNodeCondition[evictionapi.SignalPIDAvailable] = v1.NodePIDPressure
也就是将Eviction Signals分别映射成了MemoryPressure或DiskPressure,整理出来的表格如下:
Node Condition | Eviction Signal | Description |
---|---|---|
MemoryPressure | memory.available | Available memory on the node has satisfied an eviction threshold |
DiskPressure | nodefs.available, nodefs.inodesFree, imagefs.available, or imagefs.inodesFree | Available disk space and inodes on either the node's root filesystem or image filesystem has satisfied an eviction threshold |
6.本轮 node condition 与上次的observed合并,以最新的为准
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
//本轮 node condition 与上次的observed合并,以最新的为准
nodeConditionsLastObservedAt := nodeConditionsLastObservedAt(nodeConditions, m.nodeConditionsLastObservedAt, now)
...
}
7.防止Node的资源不断在阈值附近波动,从而不断变动Node Condition值
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
//PressureTransitionPeriod参数默认为5分钟
//防止Node的资源不断在阈值附近波动,从而不断变动Node Condition值
//具体查看文档:https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/#oscillation-of-node-conditions
nodeConditions = nodeConditionsObservedSince(nodeConditionsLastObservedAt, m.config.PressureTransitionPeriod, now)
if len(nodeConditions) > {
klog.V().Infof("eviction manager: node conditions - transition period not met: %v", nodeConditions)
}
...
}
nodeConditionsObservedSince
Copy
func nodeConditionsObservedSince(observedAt nodeConditionsObservedAt, period time.Duration, now time.Time) []v1.NodeConditionType {
results := []v1.NodeConditionType{}
for nodeCondition, at := range observedAt {
duration := now.Sub(at)
if duration < period {
results = append(results, nodeCondition)
}
}
return results
}
如果已经超过了5分钟,那么需要排除。
8.对eviction-soft做判断
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
//设置 eviction-soft-grace-period,默认为90秒,超过该值加入阈值集合
thresholds = thresholdsMetGracePeriod(thresholdsFirstObservedAt, now)
...
}
thresholdsMetGracePeriod
Copy
func thresholdsMetGracePeriod(observedAt thresholdsObservedAt, now time.Time) []evictionapi.Threshold {
results := []evictionapi.Threshold{}
for threshold, at := range observedAt {
duration := now.Sub(at)
//Soft Eviction Thresholds,必须要等一段时间之后才能进行trigger
if duration < threshold.GracePeriod {
klog.V().Infof("eviction manager: eviction criteria not yet met for %v, duration: %v", formatThreshold(threshold), duration)
continue
}
results = append(results, threshold)
}
return results
}
9.设值,然后比较更新
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
// update internal state
m.Lock()
m.nodeConditions = nodeConditions
m.thresholdsFirstObservedAt = thresholdsFirstObservedAt
m.nodeConditionsLastObservedAt = nodeConditionsLastObservedAt
m.thresholdsMet = thresholds
// 阈值集合跟上次比较是否需要更新
thresholds = thresholdsUpdatedStats(thresholds, observations, m.lastObservations)
debugLogThresholdsWithObservation("thresholds - updated stats", thresholds, observations)
//将本次的信息设置为上次信息
m.lastObservations = observations
m.Unlock()
...
}
10.排序之后找到第一个需要释放的threshold,以及对应的resource
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
//如果没有 eviction signal 集合则本轮结束流程
if len(thresholds) == {
klog.V().Infof("eviction manager: no resources are starved")
return nil
}
//排序之后获取thresholds集合中的第一个元素
sort.Sort(byEvictionPriority(thresholds))
thresholdToReclaim, resourceToReclaim, foundAny := getReclaimableThreshold(thresholds)
if !foundAny {
return nil
}
...
}
getReclaimableThreshold
Copy
func getReclaimableThreshold(thresholds []evictionapi.Threshold) (evictionapi.Threshold, v1.ResourceName, bool) {
//遍历thresholds,然后根据对应的Eviction Signals找到对应的resource
for _, thresholdToReclaim := range thresholds {
if resourceToReclaim, ok := signalToResource[thresholdToReclaim.Signal]; ok {
return thresholdToReclaim, resourceToReclaim, true
}
klog.V().Infof("eviction manager: threshold %s was crossed, but reclaim is not implemented for this threshold.", thresholdToReclaim.Signal)
}
return evictionapi.Threshold{}, "", false
}
下面我们看一下signalToResource的定义:
Copy
signalToResource = map[evictionapi.Signal]v1.ResourceName{}
signalToResource[evictionapi.SignalMemoryAvailable] = v1.ResourceMemory
signalToResource[evictionapi.SignalAllocatableMemoryAvailable] = v1.ResourceMemory
signalToResource[evictionapi.SignalImageFsAvailable] = v1.ResourceEphemeralStorage
signalToResource[evictionapi.SignalImageFsInodesFree] = resourceInodes
signalToResource[evictionapi.SignalNodeFsAvailable] = v1.ResourceEphemeralStorage
signalToResource[evictionapi.SignalNodeFsInodesFree] = resourceInodes
signalToResource[evictionapi.SignalPIDAvailable] = resourcePids
signalToResource将Eviction Signals分成了memory、ephemeral-storage、inodes、pids几类。、
11. 回收节点级别的资源
```go
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
//回收节点级别的资源
if m.reclaimNodeLevelResources(thresholdToReclaim.Signal, resourceToReclaim) {
klog.Infof("eviction manager: able to reduce %v pressure without evicting pods.", resourceToReclaim)
return nil
}
...
}
```
**reclaimNodeLevelResources**
```go
func (m *managerImpl) reclaimNodeLevelResources(signalToReclaim evictionapi.Signal, resourceToReclaim v1.ResourceName) bool {
//调用buildSignalToNodeReclaimFuncs中设置的方法
nodeReclaimFuncs := m.signalToNodeReclaimFuncs[signalToReclaim]
for _, nodeReclaimFunc := range nodeReclaimFuncs {
// 删除没用使用到的images或 删除已经是dead状态的Pod 和 container
if err := nodeReclaimFunc(); err != nil {
klog.Warningf("eviction manager: unexpected error when attempting to reduce %v pressure: %v", resourceToReclaim, err)
}
}
//回收之后再检查一下资源占用情况,如果没有达到阈值,那么直接结束
if len(nodeReclaimFuncs) > 0 {
summary, err := m.summaryProvider.Get(true)
if err != nil {
klog.Errorf("eviction manager: failed to get summary stats after resource reclaim: %v", err)
return false
}
observations, _ := makeSignalObservations(summary)
debugLogObservations("observations after resource reclaim", observations)
thresholds := thresholdsMet(m.config.Thresholds, observations, false)
debugLogThresholdsWithObservation("thresholds after resource reclaim - ignoring grace period", thresholds, observations)
if len(thresholds) == 0 {
return true
}
}
return false
}
```
首先根据需要释放的signal从signalToNodeReclaimFuncs中找到对应的释放资源的方法,这个方法在上面buildSignalToNodeReclaimFuncs中设置的,如:
```
nodeReclaimFuncs{containerGC.DeleteAllUnusedContainers, imageGC.DeleteUnusedImages}
```
这个方法会调用相应的GC方法,删除无用的container以及无用的images来释放资源。
然后会检查释放完资源之后是否依然超过阈值,如果没有的话就直接结束了。
12.获取相应的排序函数并进行排序
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
//得到上面的eviction signal 排序函数,在buildSignalToRankFunc方法中设置
rank, ok := m.signalToRankFunc[thresholdToReclaim.Signal]
if !ok {
klog.Errorf("eviction manager: no ranking function for signal %s", thresholdToReclaim.Signal)
return nil
}
//如果没有 active pod 直接返回
if len(activePods) == {
klog.Errorf("eviction manager: eviction thresholds have been met, but no pods are active to evict")
return nil
}
//将pod按照特定资源排序
rank(activePods, statsFunc)
...
}
13.将排好序的pod删除,并返回
Copy
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
...
for i := range activePods {
pod := activePods[i]
gracePeriodOverride := int64()
if !isHardEvictionThreshold(thresholdToReclaim) {
gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
}
message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc)
//kill pod
if m.evictPod(pod, gracePeriodOverride, message, annotations) {
metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc()
return []*v1.Pod{pod}
}
}
...
}
到这里eviction manager就分析完了~
这一篇讲解了其中资源控制是怎么做的,理解了通过limit和request的设置会影响到pod被删除的优先级,所以我们在设置pod的时候尽量设置合理的limit和request可以不那么容易被kill掉;然后通过分析了源码知道了limit和request会影响到QOS的评分,从而影响到pod被kill掉的优先级。
接下来通过源码分析了k8s中对阈值的设定是怎样的,当资源不够的时候pod是根据什么条件被kill掉的,这一部分花了很大的篇幅来介绍。通过源码也可以知道在eviction发生的时候k8s也是做了很多的考虑,比如说对于节点状态振荡应该怎么处理、首先应该回收什么类型的资源、minimum-reclaim最小回收资源在源码里是怎么做到的等等。
https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/
https://kubernetes.io/docs/tasks/configure-pod-container/assign-memory-resource/
https://kubernetes.io/docs/tasks/configure-pod-container/quality-service-pod/
https://kubernetes.io/docs/tasks/administer-cluster/out-of-resource/
https://zhuanlan.zhihu.com/p/38359775
https://cloud.tencent.com/developer/article/1097431