在请求调度的第一部分,我们介绍了api-server是如何限制服务负载以及如何进行请求调度。在调度中,一个非常重要的机制时:api-server需要估计每一个请求产生的负载。根据这个负载,调度器会选择当前需要被执行的请求,并决定是否需要丢弃当前请求。
那么这个负载是如何计算的呢?本文将会深入Kubernetes源码,分析其中的原理。
在Kubernetes中,api-server处理的请求大致可以分为两类:
通过我们对Kubernetes机制的了解,对于查询请求,Kubernetes会从本地缓存或者etcd中读取相应的信息,并返回给客户端。而对于修改请求则相对比较复杂,主要在于,api-server在完成状态更改之后,Kubernetes的其他组件会观察到资源的更改,进而执行相应的操作。这意味着,不同类型的请求对于api-server造成的负载是不同的。因此在api-server中对于不同类型的请求,工作量估计也是完全不同的。
在staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request
中定义了工作量计算的相关函数,其中核心的两个函数是针对修改请求的mutatingWorkEstimator
和针对查询请求的listWorkEstimator
。
在api-server的代码中通过下面的结构体来描述工作量的负载,Seats座位,是Kubernetes用于描述请求造成负载的基本单位,所谓的工作量估计就是计算seats。
// WorkEstimate carries three of the four parameters that determine the work in a request.
// The fourth parameter is the duration of the initial phase of execution.
type WorkEstimate struct {
// InitialSeats is the number of seats occupied while the server is
// executing this request.
InitialSeats uint64
// FinalSeats is the number of seats occupied at the end,
// during the AdditionalLatency.
FinalSeats uint64
// AdditionalLatency specifies the additional duration the seats allocated
// to this request must be reserved after the given request had finished.
// AdditionalLatency should not have any impact on the user experience, the
// caller must not experience this additional latency.
AdditionalLatency time.Duration
}
其中InitialSeats
代表请求本身会带来的负载,对于查询请求,InitialSeats
往往是其负载的总和,因为查询请求并不会带来其他的负载。而对于FinalSeats
则是用于记录请求带来的额外负载,例如更改请求,除了其本身,还有更改之后对整个系统带来的负载。而AdditionalLatency
则是依赖
FinalSeats
计算出的额外延时。下面在工作量估计讲解中,还会对这些字段进行更进一步的解释。
下面的函数是提炼关键步骤之后的listWorkEstimator
的工作量估计函数。
对于listWorkEstimator
来说,主要是通过查询对象的数量来计算最终的请求负载。核心的语句是numStored, err := e.countGetterFn(key(requestInfo))
。根据请求的信息,可以得到我们需要查询的对象的数量。
seats := uint64(math.Ceil(float64(estimatedObjectsToBeProcessed) / e.config.ObjectsPerSeat))
语句则是根据查询对象的数量计算出最终的工作负载seats
。ObjectsPerSeat
的默认值是100。
func (e *listWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimat {
// ........
isListFromCache := requestInfo.Verb == "watch" || !shouldListFromStorage(query, &listOptions)
numStored, err := e.countGetterFn(key(requestInfo))
// ........
switch {
case isListFromCache:
// TODO: For resources that implement indexes at the watchcache level,
// we need to adjust the cost accordingly
estimatedObjectsToBeProcessed = numStored
case listOptions.FieldSelector != "" || listOptions.LabelSelector != "":
estimatedObjectsToBeProcessed = numStored + limit
default:
estimatedObjectsToBeProcessed = 2 * limit
}
seats := uint64(math.Ceil(float64(estimatedObjectsToBeProcessed) / e.config.ObjectsPerSeat))
// ........
return WorkEstimate{InitialSeats: seats}
}
mutatingWorkEstimator
的计算过程也不复杂,因为更改请求往往是针对单一对象发起的,因此InitialSeats
最终设为0。FinalSeats
则与watcher
的数量息息相关,在函数中,采用watchCount := e.countFn(requestInfo)
获取了watcher
的数量。根据watcher
的数量推算出最终FinalSeats
的值。而AdditionalLatency
则可以通过watchCount
直接计算出来。
func (e *mutatingWorkEstimator) estimate(r *http.Request, flowSchemaName, priorityLevelName string) WorkEstimate {
minSeats := e.config.MinimumSeats
maxSeats := e.maxSeatsFn(priorityLevelName)
if maxSeats == 0 || maxSeats > e.config.MaximumSeatsLimit {
maxSeats = e.config.MaximumSeatsLimit
}
if !e.config.Enabled {
return WorkEstimate{
InitialSeats: minSeats,
}
}
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
if !ok {
return WorkEstimate{
InitialSeats: minSeats,
FinalSeats: maxSeats,
AdditionalLatency: e.config.eventAdditionalDuration(),
}
}
if isRequestExemptFromWatchEvents(requestInfo) {
return WorkEstimate{
InitialSeats: minSeats,
FinalSeats: 0,
AdditionalLatency: time.Duration(0),
}
}
watchCount := e.countFn(requestInfo)
metrics.ObserveWatchCount(r.Context(), priorityLevelName, flowSchemaName, watchCount)
var finalSeats uint64
var additionalLatency time.Duration
if watchCount >= int(e.config.WatchesPerSeat) {
finalSeats = uint64(math.Ceil(float64(watchCount) / e.config.WatchesPerSeat))
finalWork := SeatsTimesDuration(float64(finalSeats), e.config.eventAdditionalDuration())
if finalSeats > maxSeats {
finalSeats = maxSeats
}
additionalLatency = finalWork.DurationPerSeat(float64(finalSeats))
}
return WorkEstimate{
InitialSeats: 1,
FinalSeats: finalSeats,
AdditionalLatency: additionalLatency,
}
}
通过对工作量的简单估计,api-server获得了限流和请求调度的关键数据支撑。其实在写业务的时候也可以进行这样的计算,例如在服务器中可以将任务分为两部分的作业:内存中的计算作业和数据库访问作业。根据这两种作业的资源占用(例如对于内存计算作业主要是占用内存资源和CPU资源,而对于数据库访问作业则主要占用的是网络和时间延迟资源)对来访的请求进行限流和调度,提高整个系统的稳定性和效率。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。