杜杨浩,腾讯云高级工程师,热衷于开源、容器和Kubernetes。目前主要从事镜像仓库、Kubernetes集群高可用&备份还原,以及边缘计算相关研发工作。
SuperEdge 是基于原生 Kubernetes 的边缘容器管理系统。该系统把云原生能力扩展到边缘侧,很好的实现了云端对边缘端的管理和控制,极大简化了应用从云端部署到边缘端的过程。同时 SuperEdge 设计了分布式健康检查机制规避了云边网络不稳定造成的大量pod迁移和重建,保证了服务的稳定。
SuperEdge 分布式健康检查功能由边端的 edge-health-daemon 以及云端的 edge-health-admission 组成:
整体架构如下所示:
之所以创建 edge-health-admission 云端组件,是因为当云边断连时,kube-controller-manager 会执行如下操作:
当 edge-health-daemon 在边端根据健康检查判断节点状态正常时,会更新 node:去掉 NoExecute taint。但是在 node 成功更新之后又会被 kube-controller-manager 给刷回去(再次添加 NoExecute taint),因此必须添加 Kubernetes mutating admission webhook 也即 edge-health-admission,将 kube-controller-manager 对 node api resource 的更改做调整,最终实现分布式健康检查效果
在深入源码之前先介绍一下Kubernetes Admission Controllers[1]
An admission controller is a piece of code that intercepts requests to the Kubernetes API server prior to persistence of the object, but after the request is authenticated and authorized. The controllers consist of the list below, are compiled into the kube-apiserver binary, and may only be configured by the cluster administrator. In that list, there are two special controllers: MutatingAdmissionWebhook and ValidatingAdmissionWebhook. These execute the mutating and validating (respectively) admission control webhooks which are configured in the API.
Kubernetes Admission Controllers 是 kube-apiserver 处理api请求的某个环节,用于在api请求认证&鉴权之后,对象持久化之前进行调用,对请求进行校验或者修改(or both)。
Kubernetes Admission Controllers 包括多种 admission,大多数都内嵌在 kube-apiserver 代码中了。其中 MutatingAdmissionWebhook以及ValidatingAdmissionWebhook controller 比较特殊,它们分别会调用外部构造的 mutating admission control webhooks以及validating admission control webhooks。
Admission webhooks are HTTP callbacks that receive admission requests and do something with them. You can define two types of admission webhooks, validating admission webhook and mutating admission webhook. Mutating admission webhooks are invoked first, and can modify objects sent to the API server to enforce custom defaults. After all object modifications are complete, and after the incoming object is validated by the API server, validating admission webhooks are invoked and can reject requests to enforce custom policies.
Admission Webhooks 是一个HTTP回调服务,接受 AdmissionReview 请求并进行处理,按照处理方式的不同,可以将 Admission Webhooks 分类如下:
两种类型的 webhooks 都需要定义如下 Matching requests 字段:
这里给出 edge-health-admission 对应的 MutatingWebhookConfiguration 作为参考示例:
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
name: edge-health-admission
webhooks:
- admissionReviewVersions:
- v1
clientConfig:
caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNwRENDQVl3Q0NRQ2RaL0w2akZSSkdqQU5CZ2txaGtpRzl3MEJBUXNGQURBVU1SSXdFQVlEVlFRRERBbFgKYVhObE1tTWdRMEV3SGhjTk1qQXdOekU0TURRek9ERTNXaGNOTkRjeE1qQTBNRFF6T0RFM1dqQVVNUkl3RUFZRApWUVFEREFsWGFYTmxNbU1nUTBFd2dnRWlNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJEd0F3Z2dFS0FvSUJBUUNSCnhHT2hrODlvVkRHZklyVDBrYVkwajdJQVJGZ2NlVVFmVldSZVhVcjh5eEVOQkF6ZnJNVVZyOWlCNmEwR2VFL3cKZzdVdW8vQWtwUEgrbzNQNjFxdWYrTkg1UDBEWHBUd1pmWU56VWtyaUVja3FOSkYzL2liV0o1WGpFZUZSZWpidgpST1V1VEZabmNWOVRaeTJISVF2UzhTRzRBTWJHVmptQXlDMStLODBKdDI3QUl4YmdndmVVTW8xWFNHYnRxOXlJCmM3Zk1QTXJMSHhaOUl5aTZla3BwMnJrNVdpeU5YbXZhSVA4SmZMaEdnTU56YlJaS1RtL0ZKdDdyV0dhQ1orNXgKV0kxRGJYQ2MyWWhmbThqU1BqZ3NNQTlaNURONDU5ellJSkVhSTFHeFI3MlhaUVFMTm8zdE5jd3IzVlQxVlpiTgo1cmhHQlVaTFlrMERtd25vWTBCekFnTUJBQUV3RFFZSktvWklodmNOQVFFTEJRQURnZ0VCQUhuUDJibnJBcWlWCjYzWkpMVzM0UWFDMnRreVFScTNVSUtWR3RVZHFobWRVQ0I1SXRoSUlleUdVRVdqVExpc3BDQzVZRHh4YVdrQjUKTUxTYTlUY0s3SkNOdkdJQUdQSDlILzRaeXRIRW10aFhiR1hJQ3FEVUVmSUVwVy9ObUgvcnBPQUxhYlRvSUVzeQpVNWZPUy9PVVZUM3ZoSldlRjdPblpIOWpnYk1SZG9zVElhaHdQdTEzZEtZMi8zcEtxRW1Cd1JkbXBvTExGbW9MCmVTUFQ4SjREZExGRkh2QWJKalFVbjhKQTZjOHUrMzZJZDIrWE1sTGRZYTdnTnhvZTExQTl6eFJQczRXdlpiMnQKUXZpbHZTbkFWb0ZUSVozSlpjRXVWQXllNFNRY1dKc3FLMlM0UER1VkNFdlg0SmRCRlA2NFhvU08zM3pXaWhtLworMXg3OXZHMUpFcz0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
service:
namespace: kube-system
name: edge-health-admission
path: /node-taint
failurePolicy: Ignore
matchPolicy: Exact
name: node-taint.k8s.io
namespaceSelector: {}
objectSelector: {}
reinvocationPolicy: Never
rules:
- apiGroups:
- '*'
apiVersions:
- '*'
operations:
- UPDATE
resources:
- nodes
scope: '*'
sideEffects: None
timeoutSeconds: 5
- admissionReviewVersions:
- v1
clientConfig:
caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNwRENDQVl3Q0NRQ2RaL0w2akZSSkdqQU5CZ2txaGtpRzl3MEJBUXNGQURBVU1SSXdFQVlEVlFRRERBbFgKYVhObE1tTWdRMEV3SGhjTk1qQXdOekU0TURRek9ERTNXaGNOTkRjeE1qQTBNRFF6T0RFM1dqQVVNUkl3RUFZRApWUVFEREFsWGFYTmxNbU1nUTBFd2dnRWlNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJEd0F3Z2dFS0FvSUJBUUNSCnhHT2hrODlvVkRHZklyVDBrYVkwajdJQVJGZ2NlVVFmVldSZVhVcjh5eEVOQkF6ZnJNVVZyOWlCNmEwR2VFL3cKZzdVdW8vQWtwUEgrbzNQNjFxdWYrTkg1UDBEWHBUd1pmWU56VWtyaUVja3FOSkYzL2liV0o1WGpFZUZSZWpidgpST1V1VEZabmNWOVRaeTJISVF2UzhTRzRBTWJHVmptQXlDMStLODBKdDI3QUl4YmdndmVVTW8xWFNHYnRxOXlJCmM3Zk1QTXJMSHhaOUl5aTZla3BwMnJrNVdpeU5YbXZhSVA4SmZMaEdnTU56YlJaS1RtL0ZKdDdyV0dhQ1orNXgKV0kxRGJYQ2MyWWhmbThqU1BqZ3NNQTlaNURONDU5ellJSkVhSTFHeFI3MlhaUVFMTm8zdE5jd3IzVlQxVlpiTgo1cmhHQlVaTFlrMERtd25vWTBCekFnTUJBQUV3RFFZSktvWklodmNOQVFFTEJRQURnZ0VCQUhuUDJibnJBcWlWCjYzWkpMVzM0UWFDMnRreVFScTNVSUtWR3RVZHFobWRVQ0I1SXRoSUlleUdVRVdqVExpc3BDQzVZRHh4YVdrQjUKTUxTYTlUY0s3SkNOdkdJQUdQSDlILzRaeXRIRW10aFhiR1hJQ3FEVUVmSUVwVy9ObUgvcnBPQUxhYlRvSUVzeQpVNWZPUy9PVVZUM3ZoSldlRjdPblpIOWpnYk1SZG9zVElhaHdQdTEzZEtZMi8zcEtxRW1Cd1JkbXBvTExGbW9MCmVTUFQ4SjREZExGRkh2QWJKalFVbjhKQTZjOHUrMzZJZDIrWE1sTGRZYTdnTnhvZTExQTl6eFJQczRXdlpiMnQKUXZpbHZTbkFWb0ZUSVozSlpjRXVWQXllNFNRY1dKc3FLMlM0UER1VkNFdlg0SmRCRlA2NFhvU08zM3pXaWhtLworMXg3OXZHMUpFcz0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
service:
namespace: kube-system
name: edge-health-admission
path: /endpoint
failurePolicy: Ignore
matchPolicy: Exact
name: endpoint.k8s.io
namespaceSelector: {}
objectSelector: {}
reinvocationPolicy: Never
rules:
- apiGroups:
- '*'
apiVersions:
- '*'
operations:
- UPDATE
resources:
- endpoints
scope: '*'
sideEffects: None
timeoutSeconds: 5
kube-apiserver 会发送 AdmissionReview(apiGroup: admission.k8s.io
,apiVersion:v1 or v1beta1
)给 Webhooks,并封装成JSON格式,示例如下:
# This example shows the data contained in an AdmissionReview object for a request to update the scale subresource of an apps/v1 Deployment
{
"apiVersion": "admission.k8s.io/v1",
"kind": "AdmissionReview",
"request": {
# Random uid uniquely identifying this admission call
"uid": "705ab4f5-6393-11e8-b7cc-42010a800002",
# Fully-qualified group/version/kind of the incoming object
"kind": {"group":"autoscaling","version":"v1","kind":"Scale"},
# Fully-qualified group/version/kind of the resource being modified
"resource": {"group":"apps","version":"v1","resource":"deployments"},
# subresource, if the request is to a subresource
"subResource": "scale",
# Fully-qualified group/version/kind of the incoming object in the original request to the API server.
# This only differs from `kind` if the webhook specified `matchPolicy: Equivalent` and the
# original request to the API server was converted to a version the webhook registered for.
"requestKind": {"group":"autoscaling","version":"v1","kind":"Scale"},
# Fully-qualified group/version/kind of the resource being modified in the original request to the API server.
# This only differs from `resource` if the webhook specified `matchPolicy: Equivalent` and the
# original request to the API server was converted to a version the webhook registered for.
"requestResource": {"group":"apps","version":"v1","resource":"deployments"},
# subresource, if the request is to a subresource
# This only differs from `subResource` if the webhook specified `matchPolicy: Equivalent` and the
# original request to the API server was converted to a version the webhook registered for.
"requestSubResource": "scale",
# Name of the resource being modified
"name": "my-deployment",
# Namespace of the resource being modified, if the resource is namespaced (or is a Namespace object)
"namespace": "my-namespace",
# operation can be CREATE, UPDATE, DELETE, or CONNECT
"operation": "UPDATE",
"userInfo": {
# Username of the authenticated user making the request to the API server
"username": "admin",
# UID of the authenticated user making the request to the API server
"uid": "014fbff9a07c",
# Group memberships of the authenticated user making the request to the API server
"groups": ["system:authenticated","my-admin-group"],
# Arbitrary extra info associated with the user making the request to the API server.
# This is populated by the API server authentication layer and should be included
# if any SubjectAccessReview checks are performed by the webhook.
"extra": {
"some-key":["some-value1", "some-value2"]
}
},
# object is the new object being admitted.
# It is null for DELETE operations.
"object": {"apiVersion":"autoscaling/v1","kind":"Scale",...},
# oldObject is the existing object.
# It is null for CREATE and CONNECT operations.
"oldObject": {"apiVersion":"autoscaling/v1","kind":"Scale",...},
# options contains the options for the operation being admitted, like meta.k8s.io/v1 CreateOptions, UpdateOptions, or DeleteOptions.
# It is null for CONNECT operations.
"options": {"apiVersion":"meta.k8s.io/v1","kind":"UpdateOptions",...},
# dryRun indicates the API request is running in dry run mode and will not be persisted.
# Webhooks with side effects should avoid actuating those side effects when dryRun is true.
# See http://k8s.io/docs/reference/using-api/api-concepts/#make-a-dry-run-request for more details.
"dryRun": false
}
}
而 Webhooks 需要向 kube-apiserver 回应具有相同版本的 AdmissionReview,并封装成 JSON 格式,包含如下关键字段:
示例如下:
# a webhook response to add that label would be:
{
"apiVersion": "admission.k8s.io/v1",
"kind": "AdmissionReview",
"response": {
"uid": "<value from request.uid>",
"allowed": true,
"patchType": "JSONPatch",
"patch": "W3sib3AiOiAiYWRkIiwgInBhdGgiOiAiL3NwZWMvcmVwbGljYXMiLCAidmFsdWUiOiAzfV0="
}
}
edge-health-admission 实际上就是一个 mutating admission webhook,选择性地对 endpoints 以及 node UPDATE 请求进行修改,下面将详细分析其原理。
edge-health-admission 完全参考官方示例[5]编写,如下是监听入口:
func (eha *EdgeHealthAdmission) Run(stopCh <-chan struct{}) {
if !cache.WaitForNamedCacheSync("edge-health-admission", stopCh, eha.cfg.NodeInformer.Informer().HasSynced) {
return
}
http.HandleFunc("/node-taint", eha.serveNodeTaint)
http.HandleFunc("/endpoint", eha.serveEndpoint)
server := &http.Server{
Addr: eha.cfg.Addr,
}
go func() {
if err := server.ListenAndServeTLS(eha.cfg.CertFile, eha.cfg.KeyFile); err != http.ErrServerClosed {
klog.Fatalf("ListenAndServeTLS err %+v", err)
}
}()
for {
select {
case <-stopCh:
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
klog.Errorf("Server: program exit, server exit error %+v", err)
}
return
default:
}
}
}
这里会注册两种路由处理函数:
而这两个函数都会调用 serve 函数,如下:
// serve handles the http portion of a request prior to handing to an admit function
func serve(w http.ResponseWriter, r *http.Request, admit admitFunc) {
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}
// verify the content type is accurate
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
klog.Errorf("contentType=%s, expect application/json", contentType)
return
}
klog.V(4).Info(fmt.Sprintf("handling request: %s", body))
// The AdmissionReview that was sent to the webhook
requestedAdmissionReview := admissionv1.AdmissionReview{}
// The AdmissionReview that will be returned
responseAdmissionReview := admissionv1.AdmissionReview{}
deserializer := codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(body, nil, &requestedAdmissionReview); err != nil {
klog.Error(err)
responseAdmissionReview.Response = toAdmissionResponse(err)
} else {
// pass to admitFunc
responseAdmissionReview.Response = admit(requestedAdmissionReview)
}
// Return the same UID
responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID
klog.V(4).Info(fmt.Sprintf("sending response: %+v", responseAdmissionReview.Response))
respBytes, err := json.Marshal(responseAdmissionReview)
if err != nil {
klog.Error(err)
}
if _, err := w.Write(respBytes); err != nil {
klog.Error(err)
}
}
serve 逻辑如下所示:
其中serveNodeTaint 以及 serveEndpoint 对应的 admit 函数分别为:mutateNodeTaint 以及 mutateEndpoint,下面依次分析:
mutateNodeTaint 会对 node UPDATE 请求按照分布式健康检查结果进行修改:
func (eha *EdgeHealthAdmission) mutateNodeTaint(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
klog.V(4).Info("mutating node taint")
nodeResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"}
if ar.Request.Resource != nodeResource {
klog.Errorf("expect resource to be %s", nodeResource)
return nil
}
var node corev1.Node
deserializer := codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(ar.Request.Object.Raw, nil, &node); err != nil {
klog.Error(err)
return toAdmissionResponse(err)
}
reviewResponse := admissionv1.AdmissionResponse{}
reviewResponse.Allowed = true
if index, condition := util.GetNodeCondition(&node.Status, v1.NodeReady); index != -1 && condition.Status == v1.ConditionUnknown {
if node.Annotations != nil {
var patches []*patch
if healthy, existed := node.Annotations[common.NodeHealthAnnotation]; existed && healthy == common.NodeHealthAnnotationPros {
if index, existed := util.TaintExistsPosition(node.Spec.Taints, common.UnreachableNoExecuteTaint); existed {
patches = append(patches, &patch{
OP: "remove",
Path: fmt.Sprintf("/spec/taints/%d", index),
})
klog.V(4).Infof("UnreachableNoExecuteTaint: remove %d taints %s", index, node.Spec.Taints[index])
}
}
if len(patches) > 0 {
patchBytes, _ := json.Marshal(patches)
reviewResponse.Patch = patchBytes
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
}
}
}
return &reviewResponse
}
主体逻辑如下:
总的来说,mutateNodeTaint 的作用就是:不断修正被 kube-controller-manager 更新的节点状态,去掉 NoExecute(node.kubernetes.io/unreachable) taint,让节点不会被驱逐。
mutateEndpoint 会对 endpoints UPDATE 请求按照分布式健康检查结果进行修改:
func (eha *EdgeHealthAdmission) mutateEndpoint(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
klog.V(4).Info("mutating endpoint")
endpointResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "endpoints"}
if ar.Request.Resource != endpointResource {
klog.Errorf("expect resource to be %s", endpointResource)
return nil
}
var endpoint corev1.Endpoints
deserializer := codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(ar.Request.Object.Raw, nil, &endpoint); err != nil {
klog.Error(err)
return toAdmissionResponse(err)
}
reviewResponse := admissionv1.AdmissionResponse{}
reviewResponse.Allowed = true
for epSubsetIndex, epSubset := range endpoint.Subsets {
for notReadyAddrIndex, EndpointAddress := range epSubset.NotReadyAddresses {
if node, err := eha.nodeLister.Get(*EndpointAddress.NodeName); err == nil {
if index, condition := util.GetNodeCondition(&node.Status, v1.NodeReady); index != -1 && condition.Status == v1.ConditionUnknown {
if node.Annotations != nil {
var patches []*patch
if healthy, existed := node.Annotations[common.NodeHealthAnnotation]; existed && healthy == common.NodeHealthAnnotationPros {
// TODO: handle readiness probes failure
// Remove address on node from endpoint notReadyAddresses
patches = append(patches, &patch{
OP: "remove",
Path: fmt.Sprintf("/subsets/%d/notReadyAddresses/%d", epSubsetIndex, notReadyAddrIndex),
})
// Add address on node to endpoint readyAddresses
TargetRef := map[string]interface{}{}
TargetRef["kind"] = EndpointAddress.TargetRef.Kind
TargetRef["namespace"] = EndpointAddress.TargetRef.Namespace
TargetRef["name"] = EndpointAddress.TargetRef.Name
TargetRef["uid"] = EndpointAddress.TargetRef.UID
TargetRef["apiVersion"] = EndpointAddress.TargetRef.APIVersion
TargetRef["resourceVersion"] = EndpointAddress.TargetRef.ResourceVersion
TargetRef["fieldPath"] = EndpointAddress.TargetRef.FieldPath
patches = append(patches, &patch{
OP: "add",
Path: fmt.Sprintf("/subsets/%d/addresses/0", epSubsetIndex),
Value: map[string]interface{}{
"ip": EndpointAddress.IP,
"hostname": EndpointAddress.Hostname,
"nodeName": EndpointAddress.NodeName,
"targetRef": TargetRef,
},
})
if len(patches) != 0 {
patchBytes, _ := json.Marshal(patches)
reviewResponse.Patch = patchBytes
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
}
}
}
}
} else {
klog.Errorf("Get pod's node err %+v", err)
}
}
}
return &reviewResponse
}
主体逻辑如下:
总的来说,mutateEndpoint 的作用就是:不断修正被 kube-controller-manager 更新的 endpoints 状态,将分布式健康检查正常节点上的负载从 endpoints.Subset.NotReadyAddresses 移到 endpoints.Subset.Addresses 中,让服务依旧可用。
admission.k8s.io
,apiVersion:v1 or v1beta1
)给 Webhooks,并封装成 JSON 格式;而 Webhooks 需要向 kube-apiserver 回应具有相同版本的 AdmissionReview,并封装成JSON格式,包含如下关键字段:[1]
Kubernetes Admission Controllers: 【https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/】
[2]
validating admission webhook: 【https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#validatingadmissionwebhook】
[3]
mutating admission webhook: 【https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#mutatingadmissionwebhook】
[4]
JSON patch操作: 【https://jsonpatch.com/】
[5]
官方示例: 【https://github.com/kubernetes/kubernetes/blob/v1.13.0/test/images/webhook/main.go】
[6]
validating admission webhook: 【https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#validatingadmissionwebhook】
[7]
mutating admission webhook: 【https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#mutatingadmissionwebhook】