📢 注意,该文本非最终版本,正在更新中,版权所有,请勿转载!!
资源在 k8s 中是一个非常重要的关键因素,一些运维事故往往也就是因为一些资源限制设置的不合理而导致的。而合理的设置资源也是一门学问和经验,最近不停地被提及的 “降本增效” 通常也伴随着资源设置的优化。对于一个应用应该设置多少内存和 CPU,我觉得这不是我们在这里应该学习的(这都是实战经验积累的)。而我们需要知道的是,这些限制条件何时会被检查,会被谁检查,超过限制条件会引发什么问题。 这对于我们来说很重要,一方面实际出现问题,我们可以迅速知道原因;另一方面,这些限制条件还会和之后的调度、自动扩容/缩容有关系。所以本章节我们来看看它。
这次的寻码就有点艰难了。我的第一个落脚点是 pkg/kubelet/eviction/eviction_manager.go
我没有直接去找 limit 和 request 的原因是我更在意驱逐,驱逐会直接导致最终 pod 被调度,而 limit 是触发的关键。所以我就看到了这个包名是 eviction
(驱逐),然后这个文件名称是 eviction_manager
,好家伙,就决定是它了。
// pkg/kubelet/eviction/types.go:57
// Manager evaluates when an eviction threshold for node stability has been met on the node.
type Manager interface {
// Start starts the control loop to monitor eviction thresholds at specified interval.
Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, monitoringInterval time.Duration)
// IsUnderMemoryPressure returns true if the node is under memory pressure.
IsUnderMemoryPressure() bool
// IsUnderDiskPressure returns true if the node is under disk pressure.
IsUnderDiskPressure() bool
// IsUnderPIDPressure returns true if the node is under PID pressure.
IsUnderPIDPressure() bool
}
其中从上面的 Manager 接口其中的定义看方法名便知道其基本能力,其中 Start
方法最为关键,于是去找具体实现。
我们可以看到实现接口的是 managerImpl
(可以,这个命名很 java) 实现了 Manager
接口,然后看关键的 Start
方法
// pkg/kubelet/eviction/eviction_manager.go:178
// Start starts the control loop to observe and response to low compute resources.
func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, monitoringInterval time.Duration) {
thresholdHandler := func(message string) {
klog.InfoS(message)
m.synchronize(diskInfoProvider, podFunc)
}
if m.config.KernelMemcgNotification {
for _, threshold := range m.config.Thresholds {
if threshold.Signal == evictionapi.SignalMemoryAvailable || threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable {
notifier, err := NewMemoryThresholdNotifier(threshold, m.config.PodCgroupRoot, &CgroupNotifierFactory{}, thresholdHandler)
if err != nil {
klog.InfoS("Eviction manager: failed to create memory threshold notifier", "err", err)
} else {
go notifier.Start()
m.thresholdNotifiers = append(m.thresholdNotifiers, notifier)
}
}
}
}
// start the eviction manager monitoring
go func() {
for {
if evictedPods := m.synchronize(diskInfoProvider, podFunc); evictedPods != nil {
klog.InfoS("Eviction manager: pods evicted, waiting for pod to be cleaned up", "pods", klog.KObjSlice(evictedPods))
m.waitForPodsCleanup(podCleanedUpFunc, evictedPods)
} else {
time.Sleep(monitoringInterval)
}
}
}()
}
其中就有几个细节:
NewMemoryThresholdNotifier
Notifier
都是一个独立的协程去启动 go notifier.Start()
m.synchronize
看到这里,我就有了我的第一个问题:
NewMemoryThresholdNotifier
方法显然是针对内存的,那么 CPU 呢?难道 CPU 突破限制的通知放在别的地方? 这里就涉及一个之前提到的我看源码的一个方法,此时就可能是一个分叉,如果我现在去搜索有关 CPU 的相关问题,那么就会打破我现在的思路,所以我会选择先把这个问题记录下来,回头再寻找答案。
先小结一下:也就是说有一个专门管理驱逐的 Manager,它会启动一些协程去关注 pod 的内存;同时会同步 pod 状态,如果发现需要驱逐的 pod 则进行 cleanup。那么下面我的目标就是如何去监控 pod 的内存呢?
我在没有看过源码之前,对于 cgroup 是有一个简单的了解的,知道 docker 就是通过 linux 的 namespace 和 cgroup 来隔离的。但我不明白的是,通知是怎么来的,如果让我自己去实现那么肯定是定期循环查询内存超过阈值则进行通知,肯定性能不好。
于是,我就追着 go notifier.Start()
的 Start 找到了
// pkg/kubelet/eviction/memory_threshold_notifier.go:73
func (m *memoryThresholdNotifier) Start() {
klog.InfoS("Eviction manager: created memoryThresholdNotifier", "notifier", m.Description())
for range m.events {
m.handler(fmt.Sprintf("eviction manager: %s crossed", m.Description()))
}
}
可以看到,这里非常简单,就是不断地 handler events 这个 channel 的事件。所以,我们需要找到哪里在往 events 这个 channel 里面写入事件。引用位置只有一个那就是 UpdateThreshold
。
// pkg/kubelet/eviction/memory_threshold_notifier.go:80
func (m *memoryThresholdNotifier) UpdateThreshold(summary *statsapi.Summary) error {
// .....
newNotifier, err := m.factory.NewCgroupNotifier(m.cgroupPath, memoryUsageAttribute, memcgThreshold.Value())
if err != nil {
return err
}
m.notifier = newNotifier
go m.notifier.Start(m.events)
return nil
}
这里我们就见到主角了,NewCgroupNotifier
也就是 Cgroup
了。这里有个细节是 factory 是 NotifierFactory
也就是利用了设计模式中的工厂模式,抽象了一下生成的方法。
读源码注意事项:通常我们用的是非 Linux 的电脑阅读源码,于是在 IDE 中跳转的时候可能会有不同。比如,此时我在 Mac 下,默认点击
NewCgroupNotifier
方法最终会跳到pkg/kubelet/eviction/threshold_notifier_unsupported.go
中(IDE 会根据你当前使用的系统来进行跳转),而在非 Linux 下 cgroup 当然是没有的。但其实我们应该看的应该是 Linux 下的实现:pkg/kubelet/eviction/threshold_notifier_linux.go
本节的重点在这里,理解了 linuxCgroupNotifier 的实现,那么以后或许你也可以在其他项目中利用 cgroup 的特性来实现对内存用量的控制或管理。
threshold_notifier_linux.go
整个文件就一共 200 行,不多。分成三个部分:初始化、启动、等待。
源码阅读技巧:通常来说看 golang 的代码很长的时候,你可以先把所有的
if err != nil
去掉看看。
// pkg/kubelet/eviction/threshold_notifier_linux.go:49
// NewCgroupNotifier returns a linuxCgroupNotifier, which performs cgroup control operations required
// to receive notifications from the cgroup when the threshold is crossed in either direction.
func NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) {
// ....
var watchfd, eventfd, epfd, controlfd int
var err error
watchfd, err = unix.Open(fmt.Sprintf("%s/%s", path, attribute), unix.O_RDONLY|unix.O_CLOEXEC, 0)
// ....
controlfd, err = unix.Open(fmt.Sprintf("%s/cgroup.event_control", path), unix.O_WRONLY|unix.O_CLOEXEC, 0)
// ....
eventfd, err = unix.Eventfd(0, unix.EFD_CLOEXEC)
// ....
epfd, err = unix.EpollCreate1(unix.EPOLL_CLOEXEC)
// ....
config := fmt.Sprintf("%d %d %d", eventfd, watchfd, threshold)
_, err = unix.Write(controlfd, []byte(config))
if err != nil {
return nil, err
}
return &linuxCgroupNotifier{
eventfd: eventfd,
epfd: epfd,
stop: make(chan struct{}),
}, nil
}
去掉之后,其实主干就非常清楚了:
看完初始化的方法,其实我大致也能猜到了,既然有了 epoll,有了 fd,大概率就是将 fd 通过 controlfd 也就是 cgroup.event_control
注册给 cgroup,这样当出现内存变化的时候将具体事件通过 eventfd 通知回来。
Start
方法就非常简单了,记得这个 Start
就是 eviction_manager 为每个 pod 配置的阈值来调用启动的。
# pkg/kubelet/eviction/threshold_notifier_linux.go:110
func (n *linuxCgroupNotifier) Start(eventCh chan<- struct{}) {
err := unix.EpollCtl(n.epfd, unix.EPOLL_CTL_ADD, n.eventfd, &unix.EpollEvent{
Fd: int32(n.eventfd),
Events: unix.EPOLLIN,
})
// ...
buf := make([]byte, eventSize)
for {
select {
case <-n.stop:
return
default:
}
event, err := wait(n.epfd, n.eventfd, notifierRefreshInterval)
// ...
_, err = unix.Read(n.eventfd, buf)
// ...
eventCh <- struct{}{}
}
}
这是一个标准的 epoll 用法了,就是通过 EpollCtl
将 fd 添加进去,然后 wait 有事件之后就将事件发送到 eventCh 通道里面就好了。所以到这里我觉得 wait
方法已经不需要看了,肯定就是 epll wait 没跑了。有兴趣的同学可以看下,我这里就不贴了。
至此,我们总结一下,linuxCgroupNotifier
的实现其本质就是利用了 cgroup 的 event 机制,说白了就是以格式 eventfd watchfd threshold
写入 cgroup.event_control
就可以了,然后使用 epoll 来等着事件来。所以我在一开始就提到了,或许以后当你想利用 linux 的 cgroup 机制来监控内存时,整个代码你是可以直接抄的。这也是我们阅读源码其中一个非常重要的好处,积累一些工具或方法的设计和写法。
前面我们看到的都是内存,那么其他资源的限制呢?
还记得我们在 managerImpl
中看到的 Start
方法吗?不记得你可以回到上面再看下,在最后有一个调用 synchronize
的过程,这个方法会返回一个需要被驱逐的 pod。于是乎,我们需要知道在 synchronize
方法中是如何得到需要被驱逐的 pod 的。
源码阅读技巧:synchronize 方法特别长(之前是哪个代码规范写的说一个函数不能超过多少行来着?你看看别人 k8s 不照样写成这样吗?手动狗头~),还是一样的方法,我们需要抓主干。
其实大致的过程我们是可以猜到的,我问你如果是你,要找到一个需要驱逐的 pod 你会怎么做?是不是下面的思路
是的,思路无非就是如此,但是其他细节很多。
// pkg/kubelet/eviction/eviction_manager.go:233
func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod {
// ...
// 获取所有 pod 的使用量 summary
activePods := podFunc()
updateStats := true
summary, err := m.summaryProvider.Get(ctx, updateStats)
if err != nil {
klog.ErrorS(err, "Eviction manager: failed to get summary stats")
return nil
}
// ...
// 这里面最关键的就是 observations 和 thresholds 比较二者大小就知道是否满足阈值
// make observations and get a function to derive pod usage stats relative to those observations.
observations, statsFunc := makeSignalObservations(summary)
// determine the set of thresholds met independent of grace period
thresholds = thresholdsMet(thresholds, observations, false)
// node conditions report true if it has been observed within the transition period window
nodeConditions = nodeConditionsObservedSince(nodeConditionsLastObservedAt, m.config.PressureTransitionPeriod, now)
if len(nodeConditions) > 0 {
klog.V(3).InfoS("Eviction manager: node conditions - transition period not met", "nodeCondition", nodeConditions)
}
// determine the set of thresholds we need to drive eviction behavior (i.e. all grace periods are met)
thresholds = thresholdsMetGracePeriod(thresholdsFirstObservedAt, now)
// ...
// 这里一个细节,会优先检查 local volume 是已经超限了
// evict pods if there is a resource usage violation from local volume temporary storage
// If eviction happens in localStorageEviction function, skip the rest of eviction action
if m.localStorageCapacityIsolation {
if evictedPods := m.localStorageEviction(activePods, statsFunc); len(evictedPods) > 0 {
return evictedPods
}
}
// 关键来了,按驱逐优先级进行排序所有阈值来得到有无超限
// rank the thresholds by eviction priority
sort.Sort(byEvictionPriority(thresholds))
thresholdToReclaim, resourceToReclaim, foundAny := getReclaimableThreshold(thresholds)
if !foundAny {
return nil
}
// 细节来了,会先进行一次GC,如果 GC 之后能满足条件则就不需要驱逐了
// check if there are node-level resources we can reclaim to reduce pressure before evicting end-user pods.
if m.reclaimNodeLevelResources(ctx, thresholdToReclaim.Signal, resourceToReclaim) {
return nil
}
// GC 之后再来排序 pod,被优先驱逐的肯定是 “大头”
// rank the running pods for eviction for the specified resource
rank(activePods, statsFunc)
// 最后 for 循环出第一个需要驱逐的 pod 包装一下就可以返回了
// we kill at most a single pod during each eviction interval
for i := range activePods {
pod := activePods[i]
gracePeriodOverride := int64(0)
if !isHardEvictionThreshold(thresholdToReclaim) {
gracePeriodOverride = m.config.MaxPodGracePeriodSeconds
}
message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc, thresholds, observations)
var condition *v1.PodCondition
if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) {
condition = &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: v1.PodReasonTerminationByKubelet,
Message: message,
}
}
if m.evictPod(pod, gracePeriodOverride, message, annotations, condition) {
metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc()
return []*v1.Pod{pod}
}
}
klog.InfoS("Eviction manager: unable to evict any pods from the node")
return nil
}
那么关键的是 pod 的那些指标会被收集呢?于是我们查看一下 summary 的结构会发现:
// vendor/k8s.io/kubelet/pkg/apis/stats/v1alpha1/types.go:107
type PodStats struct {
PodRef PodReference `json:"podRef"`
StartTime metav1.Time `json:"startTime"`
Containers []ContainerStats `json:"containers" patchStrategy:"merge" patchMergeKey:"name"`
CPU *CPUStats `json:"cpu,omitempty"`
Memory *MemoryStats `json:"memory,omitempty"`
Network *NetworkStats `json:"network,omitempty"`
VolumeStats []VolumeStats `json:"volume,omitempty" patchStrategy:"merge" patchMergeKey:"name"`
EphemeralStorage *FsStats `json:"ephemeral-storage,omitempty"`
ProcessStats *ProcessStats `json:"process_stats,omitempty"`
Swap *SwapStats `json:"swap,omitempty"`
}
就是 Pod 的这些指标了,CPU、内存、磁盘等等这些都在了。那么具体这些指标如何获取的,有兴趣的同学可以追着继续看一下。同样的,节点也有统计状态,这里也不列举了,都在 summary 里面。
synchronize
立刻检查;还有一种就是定时执行 synchronize
间隔是 monitoringInterval
默认是 10sQoS 这个小知识点是容易被忽略的,当节点上资源紧张时,kubernetes 会根据预先设置的不同 QoS 类别进行相应处理。我最开始使用 k8s 的时候也没有掌握这个知识点,导致了一些问题。在这里我不做过多的介绍,你可以简单的理解为下面三种情况:
具体各个情况的说明参考官网文档:https://kubernetes.io/zh-cn/docs/concepts/workloads/pods/pod-qos/
然后,给出我自己的最佳实践
这也是一个容易遗漏的小知识点,也很容易理解:
有两个设计可以值得我们学习
Start
也可以学习。memoryThresholdNotifier
的 NotifierFactory
可以算是一个很标准的工厂模式了,定义接口实现,通过 factory 来创建 Notifier。
// NotifierFactory creates CgroupNotifer
type NotifierFactory interface {
// NewCgroupNotifier creates a CgroupNotifier that creates events when the threshold
// on the attribute in the cgroup specified by the path is crossed.
NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error)
}
这也是看源码的一个好处,如果你不知道一个设计模式应该如何使用或者没有最佳实践,看看别人实际中的使用可以让你最快学会它
如果后面有 cgroup 的使用需求,建议查看 man 文档 https://man7.org/linux/man-pages/man7/cgroups.7.html
因为 CPU 不是一个和内存一样的可以被很好量化的指标,它通常是只在一个采样周期内指标。而 k8s 采用的是 CFS,也就是说在一个采样周期内如何达到 limit,就开始限流了。所以 limit 限制过小,会导致一些突然的波峰 CPU 使用不停地被限流。并且其中还有与低版本内核 bug 相关的一些各种问题。总之你记住,给我的感觉是:“不准且复杂”。不过 CPU 密集型的业务实际不多,所以 CPU 的 limit 通常来说我的建议都是先给经验值,然后根据压测的情况去调整。
有一个国外的案例供你参考:https://medium.com/omio-engineering/cpu-limits-and-aggressive-throttling-in-kubernetes-c5b20bd8a718