前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文读懂 SuperEdge 分布式健康检查(云端)

一文读懂 SuperEdge 分布式健康检查(云端)

作者头像
腾讯云原生
发布2021-03-23 18:11:46
9210
发布2021-03-23 18:11:46
举报

杜杨浩,腾讯云高级工程师,热衷于开源、容器和Kubernetes。目前主要从事镜像仓库、Kubernetes集群高可用&备份还原,以及边缘计算相关研发工作。

前言

SuperEdge 介绍

SuperEdge 是基于原生 Kubernetes 的边缘容器管理系统。该系统把云原生能力扩展到边缘侧,很好的实现了云端对边缘端的管理和控制,极大简化了应用从云端部署到边缘端的过程。同时 SuperEdge 设计了分布式健康检查机制规避了云边网络不稳定造成的大量pod迁移和重建,保证了服务的稳定。

SuperEdge 分布式健康检查云端组件

SuperEdge 分布式健康检查功能由边端的 edge-health-daemon 以及云端的 edge-health-admission 组成:

  • edge-health-daemon:对同区域边缘节点执行分布式健康检查,并向 apiserver发送健康状态投票结果(给node打annotation)
  • edge-health-admission:不断根据 node edge-health annotation 调整 kube-controller-manager 设置的 node taint(去掉NoExecute taint)以及endpoints (将失联节点上的 pods 从 endpoint subsets notReadyAddresses 移到 addresses 中),从而实现云端和边端共同决定节点状态

整体架构如下所示:

之所以创建 edge-health-admission 云端组件,是因为当云边断连时,kube-controller-manager 会执行如下操作:

  • 失联的节点被置为 ConditionUnknown 状态,并被添加 NoSchedule 和 NoExecute 的 taints
  • 失联的节点上的 pod 从 Service 的 Endpoint 列表中移除

当 edge-health-daemon 在边端根据健康检查判断节点状态正常时,会更新 node:去掉 NoExecute taint。但是在 node 成功更新之后又会被 kube-controller-manager 给刷回去(再次添加 NoExecute taint),因此必须添加 Kubernetes mutating admission webhook 也即 edge-health-admission,将 kube-controller-manager 对 node api resource 的更改做调整,最终实现分布式健康检查效果

在深入源码之前先介绍一下Kubernetes Admission Controllers[1]

An admission controller is a piece of code that intercepts requests to the Kubernetes API server prior to persistence of the object, but after the request is authenticated and authorized. The controllers consist of the list below, are compiled into the kube-apiserver binary, and may only be configured by the cluster administrator. In that list, there are two special controllers: MutatingAdmissionWebhook and ValidatingAdmissionWebhook. These execute the mutating and validating (respectively) admission control webhooks which are configured in the API.

Kubernetes Admission Controllers 是 kube-apiserver 处理api请求的某个环节,用于在api请求认证&鉴权之后,对象持久化之前进行调用,对请求进行校验或者修改(or both)。

Kubernetes Admission Controllers 包括多种 admission,大多数都内嵌在 kube-apiserver 代码中了。其中 MutatingAdmissionWebhook以及ValidatingAdmissionWebhook controller 比较特殊,它们分别会调用外部构造的 mutating admission control webhooks以及validating admission control webhooks。

Admission webhooks are HTTP callbacks that receive admission requests and do something with them. You can define two types of admission webhooks, validating admission webhook and mutating admission webhook. Mutating admission webhooks are invoked first, and can modify objects sent to the API server to enforce custom defaults. After all object modifications are complete, and after the incoming object is validated by the API server, validating admission webhooks are invoked and can reject requests to enforce custom policies.

Admission Webhooks 是一个HTTP回调服务,接受 AdmissionReview 请求并进行处理,按照处理方式的不同,可以将 Admission Webhooks 分类如下:

  • validating admission webhook[2]:通过 ValidatingWebhookConfiguration 配置,会对api请求进行准入校验,但是不能修改请求对象
  • mutating admission webhook[3]:通过 MutatingWebhookConfiguration 配置,会对api请求进行准入校验以及修改请求对象

两种类型的 webhooks 都需要定义如下 Matching requests 字段:

  • admissionReviewVersions:定义了apiserver 所支持的 AdmissionReview api resoure 的版本列表 (API servers send the first AdmissionReview version in the admissionReviewVersions list they support)
  • name:webhook 名称(如果一个 WebhookConfiguration 中定义了多个 webhooks,需要保证名称的唯一性)
  • clientConfig:定义了 webhook server 的访问地址 (url or service)以及 CA bundle(optionally include a custom CA bundle to use to verify the TLS connection)
  • namespaceSelector:限定了匹配请求资源的命名空间 labelSelector
  • objectSelector:限定了匹配请求资源本身的 labelSelector
  • rules:限定了匹配请求的 operations,apiGroups,apiVersions,resources以及resource scope,如下:
    • operations:规定了请求操作列表(Can be "CREATE", "UPDATE", "DELETE", "CONNECT", or "*" to match all.)
    • apiGroups:规定了请求资源的 API groups 列表("" is the core API group. "*" matches all API groups.)
    • apiVersions:规定了请求资源的 API versions 列表("*" matches all API versions.)
    • resources:规定了请求资源类型(node, deployment and etc)
    • scope:规定了请求资源的范围(Cluster,Namespaced or *)
  • timeoutSeconds:规定了 webhook 回应的超时时间,如果超时了,根据 failurePolicy 进行处理
  • failurePolicy:规定了 apiserver 对 admission webhook 请求失败的处理策略:
    • Ignore:means that an error calling the webhook is ignored and the API request is allowed to continue.
    • Fail:means that an error calling the webhook causes the admission to fail and the API request to be rejected.
  • matchPolicy:规定了 rules 如何匹配到来的 api 请求,如下:
    • Exact:完全匹配 rules 列表限制
    • Equivalent:如果修改请求资源 (apiserver 可以实现对象在不同版本的转化)可以转化为能够配置 rules 列表限制,则认为该请求匹配,可以发送给 admission webhook
  • reinvocationPolicy:In v1.15+, to allow mutating admission plugins to observe changes made by other plugins, built-in mutating admission plugins are re-run if a mutating webhook modifies an object, and mutating webhooks can specify a reinvocationPolicy to control whether they are reinvoked as well.
    • Never: the webhook must not be called more than once in a single admission evaluation.
    • IfNeeded: the webhook may be called again as part of the admission evaluation if the object being admitted is modified by other admission plugins after the initial webhook call.
  • Side effects:某些 webhooks 除了修改 AdmissionReview 的内容外,还会连带修改其它的资源("side effects")。而sideEffects指示了Webhooks是否具有"side effects",取值如下:
    • None: calling the webhook will have no side effects.
    • NoneOnDryRun: calling the webhook will possibly have side effects, but if a request with dryRun: true is sent to the webhook, the webhook will suppress the side effects (the webhook is dryRun-aware).

这里给出 edge-health-admission 对应的 MutatingWebhookConfiguration 作为参考示例:

代码语言:javascript
复制
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  name: edge-health-admission
webhooks:
  - admissionReviewVersions:
      - v1
    clientConfig:
      caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNwRENDQVl3Q0NRQ2RaL0w2akZSSkdqQU5CZ2txaGtpRzl3MEJBUXNGQURBVU1SSXdFQVlEVlFRRERBbFgKYVhObE1tTWdRMEV3SGhjTk1qQXdOekU0TURRek9ERTNXaGNOTkRjeE1qQTBNRFF6T0RFM1dqQVVNUkl3RUFZRApWUVFEREFsWGFYTmxNbU1nUTBFd2dnRWlNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJEd0F3Z2dFS0FvSUJBUUNSCnhHT2hrODlvVkRHZklyVDBrYVkwajdJQVJGZ2NlVVFmVldSZVhVcjh5eEVOQkF6ZnJNVVZyOWlCNmEwR2VFL3cKZzdVdW8vQWtwUEgrbzNQNjFxdWYrTkg1UDBEWHBUd1pmWU56VWtyaUVja3FOSkYzL2liV0o1WGpFZUZSZWpidgpST1V1VEZabmNWOVRaeTJISVF2UzhTRzRBTWJHVmptQXlDMStLODBKdDI3QUl4YmdndmVVTW8xWFNHYnRxOXlJCmM3Zk1QTXJMSHhaOUl5aTZla3BwMnJrNVdpeU5YbXZhSVA4SmZMaEdnTU56YlJaS1RtL0ZKdDdyV0dhQ1orNXgKV0kxRGJYQ2MyWWhmbThqU1BqZ3NNQTlaNURONDU5ellJSkVhSTFHeFI3MlhaUVFMTm8zdE5jd3IzVlQxVlpiTgo1cmhHQlVaTFlrMERtd25vWTBCekFnTUJBQUV3RFFZSktvWklodmNOQVFFTEJRQURnZ0VCQUhuUDJibnJBcWlWCjYzWkpMVzM0UWFDMnRreVFScTNVSUtWR3RVZHFobWRVQ0I1SXRoSUlleUdVRVdqVExpc3BDQzVZRHh4YVdrQjUKTUxTYTlUY0s3SkNOdkdJQUdQSDlILzRaeXRIRW10aFhiR1hJQ3FEVUVmSUVwVy9ObUgvcnBPQUxhYlRvSUVzeQpVNWZPUy9PVVZUM3ZoSldlRjdPblpIOWpnYk1SZG9zVElhaHdQdTEzZEtZMi8zcEtxRW1Cd1JkbXBvTExGbW9MCmVTUFQ4SjREZExGRkh2QWJKalFVbjhKQTZjOHUrMzZJZDIrWE1sTGRZYTdnTnhvZTExQTl6eFJQczRXdlpiMnQKUXZpbHZTbkFWb0ZUSVozSlpjRXVWQXllNFNRY1dKc3FLMlM0UER1VkNFdlg0SmRCRlA2NFhvU08zM3pXaWhtLworMXg3OXZHMUpFcz0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
      service:
        namespace: kube-system
        name: edge-health-admission
        path: /node-taint
    failurePolicy: Ignore
    matchPolicy: Exact
    name: node-taint.k8s.io
    namespaceSelector: {}
    objectSelector: {}
    reinvocationPolicy: Never
    rules:
      - apiGroups:
          - '*'
        apiVersions:
          - '*'
        operations:
          - UPDATE
        resources:
          - nodes
        scope: '*'
    sideEffects: None
    timeoutSeconds: 5
  - admissionReviewVersions:
      - v1
    clientConfig:
      caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUNwRENDQVl3Q0NRQ2RaL0w2akZSSkdqQU5CZ2txaGtpRzl3MEJBUXNGQURBVU1SSXdFQVlEVlFRRERBbFgKYVhObE1tTWdRMEV3SGhjTk1qQXdOekU0TURRek9ERTNXaGNOTkRjeE1qQTBNRFF6T0RFM1dqQVVNUkl3RUFZRApWUVFEREFsWGFYTmxNbU1nUTBFd2dnRWlNQTBHQ1NxR1NJYjNEUUVCQVFVQUE0SUJEd0F3Z2dFS0FvSUJBUUNSCnhHT2hrODlvVkRHZklyVDBrYVkwajdJQVJGZ2NlVVFmVldSZVhVcjh5eEVOQkF6ZnJNVVZyOWlCNmEwR2VFL3cKZzdVdW8vQWtwUEgrbzNQNjFxdWYrTkg1UDBEWHBUd1pmWU56VWtyaUVja3FOSkYzL2liV0o1WGpFZUZSZWpidgpST1V1VEZabmNWOVRaeTJISVF2UzhTRzRBTWJHVmptQXlDMStLODBKdDI3QUl4YmdndmVVTW8xWFNHYnRxOXlJCmM3Zk1QTXJMSHhaOUl5aTZla3BwMnJrNVdpeU5YbXZhSVA4SmZMaEdnTU56YlJaS1RtL0ZKdDdyV0dhQ1orNXgKV0kxRGJYQ2MyWWhmbThqU1BqZ3NNQTlaNURONDU5ellJSkVhSTFHeFI3MlhaUVFMTm8zdE5jd3IzVlQxVlpiTgo1cmhHQlVaTFlrMERtd25vWTBCekFnTUJBQUV3RFFZSktvWklodmNOQVFFTEJRQURnZ0VCQUhuUDJibnJBcWlWCjYzWkpMVzM0UWFDMnRreVFScTNVSUtWR3RVZHFobWRVQ0I1SXRoSUlleUdVRVdqVExpc3BDQzVZRHh4YVdrQjUKTUxTYTlUY0s3SkNOdkdJQUdQSDlILzRaeXRIRW10aFhiR1hJQ3FEVUVmSUVwVy9ObUgvcnBPQUxhYlRvSUVzeQpVNWZPUy9PVVZUM3ZoSldlRjdPblpIOWpnYk1SZG9zVElhaHdQdTEzZEtZMi8zcEtxRW1Cd1JkbXBvTExGbW9MCmVTUFQ4SjREZExGRkh2QWJKalFVbjhKQTZjOHUrMzZJZDIrWE1sTGRZYTdnTnhvZTExQTl6eFJQczRXdlpiMnQKUXZpbHZTbkFWb0ZUSVozSlpjRXVWQXllNFNRY1dKc3FLMlM0UER1VkNFdlg0SmRCRlA2NFhvU08zM3pXaWhtLworMXg3OXZHMUpFcz0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=
      service:
        namespace: kube-system
        name: edge-health-admission
        path: /endpoint
    failurePolicy: Ignore
    matchPolicy: Exact
    name: endpoint.k8s.io
    namespaceSelector: {}
    objectSelector: {}
    reinvocationPolicy: Never
    rules:
      - apiGroups:
          - '*'
        apiVersions:
          - '*'
        operations:
          - UPDATE
        resources:
          - endpoints
        scope: '*'
    sideEffects: None
    timeoutSeconds: 5

kube-apiserver 会发送 AdmissionReview(apiGroup: admission.k8s.io,apiVersion:v1 or v1beta1)给 Webhooks,并封装成JSON格式,示例如下:

代码语言:javascript
复制
# This example shows the data contained in an AdmissionReview object for a request to update the scale subresource of an apps/v1 Deployment
{
  "apiVersion": "admission.k8s.io/v1",
  "kind": "AdmissionReview",
  "request": {
    # Random uid uniquely identifying this admission call
    "uid": "705ab4f5-6393-11e8-b7cc-42010a800002",
    # Fully-qualified group/version/kind of the incoming object
    "kind": {"group":"autoscaling","version":"v1","kind":"Scale"},
    # Fully-qualified group/version/kind of the resource being modified
    "resource": {"group":"apps","version":"v1","resource":"deployments"},
    # subresource, if the request is to a subresource
    "subResource": "scale",
    # Fully-qualified group/version/kind of the incoming object in the original request to the API server.
    # This only differs from `kind` if the webhook specified `matchPolicy: Equivalent` and the
    # original request to the API server was converted to a version the webhook registered for.
    "requestKind": {"group":"autoscaling","version":"v1","kind":"Scale"},
    # Fully-qualified group/version/kind of the resource being modified in the original request to the API server.
    # This only differs from `resource` if the webhook specified `matchPolicy: Equivalent` and the
    # original request to the API server was converted to a version the webhook registered for.
    "requestResource": {"group":"apps","version":"v1","resource":"deployments"},
    # subresource, if the request is to a subresource
    # This only differs from `subResource` if the webhook specified `matchPolicy: Equivalent` and the
    # original request to the API server was converted to a version the webhook registered for.
    "requestSubResource": "scale",
    # Name of the resource being modified
    "name": "my-deployment",
    # Namespace of the resource being modified, if the resource is namespaced (or is a Namespace object)
    "namespace": "my-namespace",
    # operation can be CREATE, UPDATE, DELETE, or CONNECT
    "operation": "UPDATE",
    "userInfo": {
      # Username of the authenticated user making the request to the API server
      "username": "admin",
      # UID of the authenticated user making the request to the API server
      "uid": "014fbff9a07c",
      # Group memberships of the authenticated user making the request to the API server
      "groups": ["system:authenticated","my-admin-group"],
      # Arbitrary extra info associated with the user making the request to the API server.
      # This is populated by the API server authentication layer and should be included
      # if any SubjectAccessReview checks are performed by the webhook.
      "extra": {
        "some-key":["some-value1", "some-value2"]
      }
    },
    # object is the new object being admitted.
    # It is null for DELETE operations.
    "object": {"apiVersion":"autoscaling/v1","kind":"Scale",...},
    # oldObject is the existing object.
    # It is null for CREATE and CONNECT operations.
    "oldObject": {"apiVersion":"autoscaling/v1","kind":"Scale",...},
    # options contains the options for the operation being admitted, like meta.k8s.io/v1 CreateOptions, UpdateOptions, or DeleteOptions.
    # It is null for CONNECT operations.
    "options": {"apiVersion":"meta.k8s.io/v1","kind":"UpdateOptions",...},
    # dryRun indicates the API request is running in dry run mode and will not be persisted.
    # Webhooks with side effects should avoid actuating those side effects when dryRun is true.
    # See http://k8s.io/docs/reference/using-api/api-concepts/#make-a-dry-run-request for more details.
    "dryRun": false
  }
}

而 Webhooks 需要向 kube-apiserver 回应具有相同版本的 AdmissionReview,并封装成 JSON 格式,包含如下关键字段:

  • uid:拷贝发送给 webhooks的AdmissionReview request.uid 字段;
  • allowed:true 表示准许;false 表示不准许;
  • status:当不准许请求时,可以通过 status 给出相关原因(http code and message);
  • patch:base64 编码,包含 mutating admission webhook 对请求对象的一系列JSON patch操作[4]
  • patchType:目前只支持 JSONPatch 类型。

示例如下:

代码语言:javascript
复制
# a webhook response to add that label would be:
{
  "apiVersion": "admission.k8s.io/v1",
  "kind": "AdmissionReview",
  "response": {
    "uid": "<value from request.uid>",
    "allowed": true,
    "patchType": "JSONPatch",
    "patch": "W3sib3AiOiAiYWRkIiwgInBhdGgiOiAiL3NwZWMvcmVwbGljYXMiLCAidmFsdWUiOiAzfV0="
  }
}

edge-health-admission 实际上就是一个 mutating admission webhook,选择性地对 endpoints 以及 node UPDATE 请求进行修改,下面将详细分析其原理。

edge-health-admission 源码分析

edge-health-admission 完全参考官方示例[5]编写,如下是监听入口:

代码语言:javascript
复制
func (eha *EdgeHealthAdmission) Run(stopCh <-chan struct{}) {
    if !cache.WaitForNamedCacheSync("edge-health-admission", stopCh, eha.cfg.NodeInformer.Informer().HasSynced) {
        return
    }
    http.HandleFunc("/node-taint", eha.serveNodeTaint)
    http.HandleFunc("/endpoint", eha.serveEndpoint)
    server := &http.Server{
        Addr: eha.cfg.Addr,
    }
    go func() {
        if err := server.ListenAndServeTLS(eha.cfg.CertFile, eha.cfg.KeyFile); err != http.ErrServerClosed {
            klog.Fatalf("ListenAndServeTLS err %+v", err)
        }
    }()
    for {
        select {
        case <-stopCh:
            ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
            defer cancel()
            if err := server.Shutdown(ctx); err != nil {
                klog.Errorf("Server: program exit, server exit error %+v", err)
            }
            return
        default:
        }
    }
}

这里会注册两种路由处理函数:

  • node-taint:对应处理函数 serveNodeTaint,负责对 node UPDATE 请求进行更改;
  • endpoint:对应处理函数 serveEndpoint,负责对 endpoints UPDATE 请求进行更改。

而这两个函数都会调用 serve 函数,如下:

代码语言:javascript
复制
// serve handles the http portion of a request prior to handing to an admit function
func serve(w http.ResponseWriter, r *http.Request, admit admitFunc) {
    var body []byte
    if r.Body != nil {
        if data, err := ioutil.ReadAll(r.Body); err == nil {
            body = data
        }
    }
    // verify the content type is accurate
    contentType := r.Header.Get("Content-Type")
    if contentType != "application/json" {
        klog.Errorf("contentType=%s, expect application/json", contentType)
        return
    }
    klog.V(4).Info(fmt.Sprintf("handling request: %s", body))
    // The AdmissionReview that was sent to the webhook
    requestedAdmissionReview := admissionv1.AdmissionReview{}
    // The AdmissionReview that will be returned
    responseAdmissionReview := admissionv1.AdmissionReview{}
    deserializer := codecs.UniversalDeserializer()
    if _, _, err := deserializer.Decode(body, nil, &requestedAdmissionReview); err != nil {
        klog.Error(err)
        responseAdmissionReview.Response = toAdmissionResponse(err)
    } else {
        // pass to admitFunc
        responseAdmissionReview.Response = admit(requestedAdmissionReview)
    }
    // Return the same UID
    responseAdmissionReview.Response.UID = requestedAdmissionReview.Request.UID
    klog.V(4).Info(fmt.Sprintf("sending response: %+v", responseAdmissionReview.Response))
    respBytes, err := json.Marshal(responseAdmissionReview)
    if err != nil {
        klog.Error(err)
    }
    if _, err := w.Write(respBytes); err != nil {
        klog.Error(err)
    }
}

serve 逻辑如下所示:

  • 解析 request.Body为AdmissionReview 对象,并赋值给 requestedAdmissionReview.
  • 对 AdmissionReview 对象执行 admit 函数,并赋值给回 responseAdmissionReview.
  • 设置 responseAdmissionReview.Response.UID 为请求的 AdmissionReview.Request.UID.

其中serveNodeTaint 以及 serveEndpoint 对应的 admit 函数分别为:mutateNodeTaint 以及 mutateEndpoint,下面依次分析:

mutateNodeTaint

mutateNodeTaint 会对 node UPDATE 请求按照分布式健康检查结果进行修改:

代码语言:javascript
复制
func (eha *EdgeHealthAdmission) mutateNodeTaint(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
    klog.V(4).Info("mutating node taint")
    nodeResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"}
    if ar.Request.Resource != nodeResource {
        klog.Errorf("expect resource to be %s", nodeResource)
        return nil
    }
    var node corev1.Node
    deserializer := codecs.UniversalDeserializer()
    if _, _, err := deserializer.Decode(ar.Request.Object.Raw, nil, &node); err != nil {
        klog.Error(err)
        return toAdmissionResponse(err)
    }
    reviewResponse := admissionv1.AdmissionResponse{}
    reviewResponse.Allowed = true
    if index, condition := util.GetNodeCondition(&node.Status, v1.NodeReady); index != -1 && condition.Status == v1.ConditionUnknown {
        if node.Annotations != nil {
            var patches []*patch
            if healthy, existed := node.Annotations[common.NodeHealthAnnotation]; existed && healthy == common.NodeHealthAnnotationPros {
                if index, existed := util.TaintExistsPosition(node.Spec.Taints, common.UnreachableNoExecuteTaint); existed {
                    patches = append(patches, &patch{
                        OP:   "remove",
                        Path: fmt.Sprintf("/spec/taints/%d", index),
                    })
                    klog.V(4).Infof("UnreachableNoExecuteTaint: remove %d taints %s", index, node.Spec.Taints[index])
                }
            }
            if len(patches) > 0 {
                patchBytes, _ := json.Marshal(patches)
                reviewResponse.Patch = patchBytes
                pt := admissionv1.PatchTypeJSONPatch
                reviewResponse.PatchType = &pt
            }
        }
    }
    return &reviewResponse
}

主体逻辑如下:

  • 检查 AdmissionReview.Request.Resource 是否为 node 资源的 group/version/kind.
  • 将 AdmissionReview.Request.Object.Raw 转化为 node 对象
  • 设置 AdmissionReview.Response.Allowed 为 true,表示无论如何都准许该请求
  • 执行协助边端健康检查核心逻辑:在节点处于 ConditionUnknown 状态且分布式健康检查结果为正常的情况下,若节点存在 NoExecute(node.kubernetes.io/unreachable) taint,则将其移除

总的来说,mutateNodeTaint 的作用就是:不断修正被 kube-controller-manager 更新的节点状态,去掉 NoExecute(node.kubernetes.io/unreachable) taint,让节点不会被驱逐。

mutateEndpoint

mutateEndpoint 会对 endpoints UPDATE 请求按照分布式健康检查结果进行修改:

代码语言:javascript
复制
func (eha *EdgeHealthAdmission) mutateEndpoint(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
    klog.V(4).Info("mutating endpoint")
    endpointResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "endpoints"}
    if ar.Request.Resource != endpointResource {
        klog.Errorf("expect resource to be %s", endpointResource)
        return nil
    }
    var endpoint corev1.Endpoints
    deserializer := codecs.UniversalDeserializer()
    if _, _, err := deserializer.Decode(ar.Request.Object.Raw, nil, &endpoint); err != nil {
        klog.Error(err)
        return toAdmissionResponse(err)
    }
    reviewResponse := admissionv1.AdmissionResponse{}
    reviewResponse.Allowed = true
    for epSubsetIndex, epSubset := range endpoint.Subsets {
        for notReadyAddrIndex, EndpointAddress := range epSubset.NotReadyAddresses {
            if node, err := eha.nodeLister.Get(*EndpointAddress.NodeName); err == nil {
                if index, condition := util.GetNodeCondition(&node.Status, v1.NodeReady); index != -1 && condition.Status == v1.ConditionUnknown {
                    if node.Annotations != nil {
                        var patches []*patch
                        if healthy, existed := node.Annotations[common.NodeHealthAnnotation]; existed && healthy == common.NodeHealthAnnotationPros {
                            // TODO: handle readiness probes failure
                            // Remove address on node from endpoint notReadyAddresses
                            patches = append(patches, &patch{
                                OP:   "remove",
                                Path: fmt.Sprintf("/subsets/%d/notReadyAddresses/%d", epSubsetIndex, notReadyAddrIndex),
                            })
                            // Add address on node to endpoint readyAddresses
                            TargetRef := map[string]interface{}{}
                            TargetRef["kind"] = EndpointAddress.TargetRef.Kind
                            TargetRef["namespace"] = EndpointAddress.TargetRef.Namespace
                            TargetRef["name"] = EndpointAddress.TargetRef.Name
                            TargetRef["uid"] = EndpointAddress.TargetRef.UID
                            TargetRef["apiVersion"] = EndpointAddress.TargetRef.APIVersion
                            TargetRef["resourceVersion"] = EndpointAddress.TargetRef.ResourceVersion
                            TargetRef["fieldPath"] = EndpointAddress.TargetRef.FieldPath
                            patches = append(patches, &patch{
                                OP:   "add",
                                Path: fmt.Sprintf("/subsets/%d/addresses/0", epSubsetIndex),
                                Value: map[string]interface{}{
                                    "ip":        EndpointAddress.IP,
                                    "hostname":  EndpointAddress.Hostname,
                                    "nodeName":  EndpointAddress.NodeName,
                                    "targetRef": TargetRef,
                                },
                            })
                            if len(patches) != 0 {
                                patchBytes, _ := json.Marshal(patches)
                                reviewResponse.Patch = patchBytes
                                pt := admissionv1.PatchTypeJSONPatch
                                reviewResponse.PatchType = &pt
                            }
                        }
                    }
                }
            } else {
                klog.Errorf("Get pod's node err %+v", err)
            }
        }
    }
    return &reviewResponse
}

主体逻辑如下:

  • 检查 AdmissionReview.Request.Resource 是否为 endpoints 资源的 group/version/kind;
  • 将 AdmissionReview.Request.Object.Raw 转化为 endpoints 对象;
  • 设置 AdmissionReview.Response.Allowed 为 true,表示无论如何都准许该请求;
  • 遍历 endpoints.Subset.NotReadyAddresses,如果 EndpointAddress 所在节点处于 ConditionUnknown 状态且分布式健康检查结果为正常,则将该 EndpointAddress从endpoints.Subset.NotReadyAddresses 移到 endpoints.Subset.Addresses。

总的来说,mutateEndpoint 的作用就是:不断修正被 kube-controller-manager 更新的 endpoints 状态,将分布式健康检查正常节点上的负载从 endpoints.Subset.NotReadyAddresses 移到 endpoints.Subset.Addresses 中,让服务依旧可用。

总结

  • SuperEdge 分布式健康检查功能由边端的 edge-health-daemon 以及云端的 edge-health-admission 组成:
    • edge-health-daemon:对同区域边缘节点执行分布式健康检查,并向 apiserver 发送健康状态投票结果(给 node 打 annotation);
    • edge-health-admission:不断根据 node edge-health annotation 调整 kube-controller-manager 设置的 node taint (去掉 NoExecute taint)以及 endpoints(将失联节点上的 pods从endpoint subsets notReadyAddresses 移到 addresses 中),从而实现云端和边端共同决定节点状态。
  • 之所以创建 edge-health-admission 云端组件,是因为当云边断连时,kube-controller-manager 会将失联的节点置为 ConditionUnknown 状态,并添加 NoSchedule 和 NoExecute 的 taints;同时失联的节点上的pod从 Service 的 Endpoint 列表中移除。当 edge-health-daemon 在边端根据健康检查判断节点状态正常时,会更新 node:去掉 NoExecute taint。但是在node 成功更新之后又会被 kube-controller-manager 给刷回去(再次添加NoExecute taint),因此必须添加 Kubernetes mutating admission webhook 也即 edge-health-admission,将 kube-controller-manager 对node api resource 的更改做调整,最终实现分布式健康检查效果。
  • Kubernetes Admission Controllers 是 kube-apiserver 处理api请求的某个环节,用于在api请求认证&鉴权之后,对象持久化之前进行调用,对请求进行校验或者修改(or both);包括多种 admission,大多数都内嵌在 kube-apiserver 代码中了。其中 MutatingAdmissionWebhook 以及 ValidatingAdmissionWebhook controller 比较特殊,它们分别会调用外部构造的 mutating admission control webhooks 以及 validating admission control webhooks。
  • Admission Webhooks 是一个HTTP回调服务,接受 AdmissionReview 请求并进行处理,按照处理方式的不同,可以将 Admission Webhooks 分类如下:
    • validating admission webhook[6]:通过 ValidatingWebhookConfiguration 配置,会对api请求进行准入校验,但是不能修改请求对象;
    • mutating admission webhook[7]:通过 MutatingWebhookConfiguration 配置,会对 api请求进行准入校验以及修改请求对象。
  • kube-apiserver 会发送 AdmissionReview(apiGroup: admission.k8s.io,apiVersion:v1 or v1beta1)给 Webhooks,并封装成 JSON 格式;而 Webhooks 需要向 kube-apiserver 回应具有相同版本的 AdmissionReview,并封装成JSON格式,包含如下关键字段:
    • uid:拷贝发送给 webhooks的AdmissionReview request.uid 字段;
    • allowed:true 表示准许;false 表示不准许;
    • status:当不准许请求时,可以通过 status 给出相关原因(http code and message);
    • patch:base64编码,包含 mutating admission webhook 对请求对象的一系列 JSON patch 操作;
    • patchType:目前只支持 JSONPatch 类型。
  • edge-health-admission 实际上就是一个 mutating admission webhook,选择性地对 endpoints 以及 node UPDATE 请求进行修改,包含如下处理逻辑:
    • mutateNodeTaint:不断修正被 kube-controller-manager 更新的节点状态,去掉 NoExecute(node.kubernetes.io/unreachable) taint,让节点不会被驱逐;
    • mutateEndpoint:不断修正被 kube-controller-manager 更新的 endpoints 状态,将分布式健康检查正常节点上的负载从 endpoints.Subset.NotReadyAddresses 移到 endpoints.Subset.Addresses 中,让服务依旧可用。

参考资料

[1]

Kubernetes Admission Controllers: 【https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/】

[2]

validating admission webhook: 【https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#validatingadmissionwebhook】

[3]

mutating admission webhook: 【https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#mutatingadmissionwebhook】

[4]

JSON patch操作: 【https://jsonpatch.com/】

[5]

官方示例: 【https://github.com/kubernetes/kubernetes/blob/v1.13.0/test/images/webhook/main.go】

[6]

validating admission webhook: 【https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#validatingadmissionwebhook】

[7]

mutating admission webhook: 【https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#mutatingadmissionwebhook】

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-03-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯云原生 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • SuperEdge 介绍
      • SuperEdge 分布式健康检查云端组件
      • edge-health-admission 源码分析
        • mutateNodeTaint
          • mutateEndpoint
          • 总结
            • 参考资料
            相关产品与服务
            容器服务
            腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档