前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >k8s APIServer调用webhook需要域名解析吗?

k8s APIServer调用webhook需要域名解析吗?

作者头像
我的小碗汤
发布2021-07-14 13:33:43
3.3K0
发布2021-07-14 13:33:43
举报
文章被收录于专栏:我的小碗汤

背景

最近看到测试环境,coredns Pod挂掉了,但k8s APIServer调用webhook仍然正常,对此有点儿疑惑,难道APIServer调用webhook中的service不需要经过coredns域名解析?直接获取到了svc ClusterIP或者endpoint中的podIP地址?带着这个问题,深入了解下apiServer请求webhook时的一些源码。

k8s版本:1.18.14

部署方式:kube-apiserver为二进制部署,systemd管理。

webhook示例

代码语言:javascript
复制
$ kubectl get mutatingwebhookconfigurations.admissionregistration.k8s.io  rocketmq-operator-mutating-webhook-configuration  -oyaml
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  annotations:
    cert-manager.io/inject-ca-from: rocketmq/rocketmq-operator-serving-cert
  name: rocketmq-operator-mutating-webhook-configuration
webhooks:
- admissionReviewVersions:
  - v1
  - v1beta1
  clientConfig:
    caBundle: LS0tLSXXX
    service:
      name: rocketmq-operator-webhook-service
      namespace: rocketmq
      path: /mutate-rocketmq-apache-org-v1alpha1-rocketmqcluster
      port: 443
  failurePolicy: Fail
  matchPolicy: Equivalent
  name: mrocketmqcluster.kb.io
  namespaceSelector: {}
  objectSelector: {}
  reinvocationPolicy: Never
  rules:
  - apiGroups:
    - rocketmq.apache.org
    apiVersions:
    - v1alpha1
    operations:
    - CREATE
    - UPDATE
    resources:
    - rocketmqclusters
    scope: '*'
  sideEffects: None
  timeoutSeconds: 10

源码分析

在源码的

kubernetes/staging/src/k8s.io/apiserver/pkg/server/plugins.go的RegisterAllAdmissionPlugins函数中:

代码语言:javascript
复制
// RegisterAllAdmissionPlugins registers all admission plugins
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
  lifecycle.Register(plugins)
  validatingwebhook.Register(plugins)
  mutatingwebhook.Register(plugins)
}
代码语言:javascript
复制

注册了三种准入控制插件:lifecycle、validatingwebhook、mutatingwebhook

这里只看mutatingwebhook,其他的后面文章会讲到。

kubernetes/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go

代码语言:javascript
复制
//NewMutatingWebhook 返回一个通用的准入 webhook 插件。
func NewMutatingWebhook(configFile io.Reader) (*Plugin, error) {
  handler := admission.NewHandler(admission.Connect, admission.Create, admission.Delete, admission.Update)
  p := &Plugin{}
  var err error
  p.Webhook, err = generic.NewWebhook(handler, configFile, configuration.NewMutatingWebhookConfigurationManager, newMutatingDispatcher(p))
  if err != nil {
    return nil, err
  }

  return p, nil
}
代码语言:javascript
复制

这里的NewWebhook调用:

kubernetes/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/generic/webhook.go

代码语言:javascript
复制
func NewWebhook(handler *admission.Handler, configFile io.Reader, sourceFactory sourceFactory, dispatcherFactory dispatcherFactory) (*Webhook, error) {
  kubeconfigFile, err := config.LoadConfig(configFile)
  //...省略...
  // Set defaults which may be overridden later.
  cm.SetServiceResolver(webhookutil.NewDefaultServiceResolver())

  return &Webhook{
    Handler:          handler,
    sourceFactory:    sourceFactory,
    clientManager:    &cm,
    namespaceMatcher: &namespace.Matcher{},
    objectMatcher:    &object.Matcher{},
    dispatcher:       dispatcherFactory(&cm),
  }, nil
}
代码语言:javascript
复制

设置为

kubernetes/staging/src/k8s.io/apiserver/pkg/util/webhook/serviceresolver.go

中的defaultServiceResolver:

代码语言:javascript
复制
//ResolveEndpoint 根据给定的命名空间和名称构造服务 URL,
//请注意名称、命名空间和端口是必需的,并且默认情况下所有创建的地址都使用 HTTPS 方案。
//例如:name=ross namespace=andromeda 解析为 https://ross.andromeda.svc:443
func (sr defaultServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
  if len(name) == 0 || len(namespace) == 0 || port == 0 {
    return nil, errors.New("cannot resolve an empty service name or namespace or port")
  }
  return &url.URL{Scheme: "https", Host: fmt.Sprintf("%s.%s.svc:%d", name, namespace, port)}, nil
}

configuration.NewMutatingWebhookConfigurationManager中设置

MutatingWebhookConfigurations对象变化时informer回调函数

AddFunc、UpdateFunc、DeleteFunc,

调用mutatingWebhookConfigurationManager的updateConfiguration方法。

代码在

kubernetes/staging/src/k8s.io/apiserver/pkg/admission/configuration/mutating_webhook_manager.go中。

进而调用mergeMutatingWebhookConfigurations函数,为每个webhook生产一个accessors

代码语言:javascript
复制
func mergeMutatingWebhookConfigurations(configurations []*v1.MutatingWebhookConfiguration) []webhook.WebhookAccessor {
  //每个配置的 webhook 的内部顺序由用户提供,但配置本身可以是任何顺序。
    //由于我们将连续运行这些 webhook,因此它们在此处进行排序以具有确定性的顺序。
  sort.SliceStable(configurations, MutatingWebhookConfigurationSorter(configurations).ByName)
  accessors := []webhook.WebhookAccessor{}
  for _, c := range configurations {
    //webhook 名称未验证唯一性,因此我们检查重复项并添加 int 后缀以区分它们
    names := map[string]int{}
    for i := range c.Webhooks {
      n := c.Webhooks[i].Name
      uid := fmt.Sprintf("%s/%s/%d", c.Name, n, names[n])
      names[n]++
      accessors = append(accessors, webhook.NewMutatingWebhookAccessor(uid, c.Name, &c.Webhooks[i]))
    }
  }
  return accessors
}
代码语言:javascript
复制

这里返回的accessor集合会被Store到一个atomic.Value中。后面会被Load。当发生mutate webhook调用时,

会执行

kubernetes/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/plugin.go的

代码语言:javascript
复制
// Admit 根据请求属性做出准入决定。
func (a *Plugin) Admit(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) error {
  return a.Webhook.Dispatch(ctx, attr, o)
}
代码语言:javascript
复制

这里面Dispatch方法中会Load到accessors集合,调用mutatingDispatcher的Dispatch方法。位于:

kubernetes/staging/src/k8s.io/apiserver/pkg/admission/plugin/webhook/mutating/dispatcher.go

代码语言:javascript
复制

func (a *mutatingDispatcher) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces, hooks []webhook.WebhookAccessor) error {  
  //...省略...
    changed, err := a.callAttrMutatingHook(ctx, hook, invocation, versionedAttr, o, round, i)
  //...省略...
  return nil
}

func (a *mutatingDispatcher) callAttrMutatingHook(ctx context.Context, h *admissionregistrationv1.MutatingWebhook, invocation *generic.WebhookInvocation, attr *generic.VersionedAttributes, o admission.ObjectInterfaces, round, idx int) (bool, error) {
  configurationName := invocation.Webhook.GetConfigurationName()
  annotator := newWebhookAnnotator(attr, round, idx, h.Name, configurationName)
  changed := false
  defer func() { annotator.addMutationAnnotation(changed) }()
  if attr.Attributes.IsDryRun() {
    if h.SideEffects == nil {
      return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: fmt.Errorf("Webhook SideEffects is nil")}
    }
    if !(*h.SideEffects == admissionregistrationv1.SideEffectClassNone || *h.SideEffects == admissionregistrationv1.SideEffectClassNoneOnDryRun) {
      return false, webhookerrors.NewDryRunUnsupportedErr(h.Name)
    }
  }

  uid, request, response, err := webhookrequest.CreateAdmissionObjects(attr, invocation)
  if err != nil {
    return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
  }
  // Make the webhook request
  client, err := invocation.Webhook.GetRESTClient(a.cm)
  if err != nil {
    return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
  }
  
    //...省略...

  if err := r.Do(ctx).Into(response); err != nil {
    return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
  }
  trace.Step("Request completed")

  result, err := webhookrequest.VerifyAdmissionResponse(uid, true, response)
  if err != nil {
    return false, &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
  }

  //...省略...
  return changed, nil
}
代码语言:javascript
复制

重点在上面的client的构建,即调用invocation.Webhook.GetRESTClient(a.cm),

当webhook为mutate时,这个accessor为mutatingWebhookAccessor,

实现了WebhookAccessor接口。

代码语言:javascript
复制
func (m *mutatingWebhookAccessor) GetRESTClient(clientManager *webhookutil.ClientManager) (*rest.RESTClient, error) {
  m.initClient.Do(func() {
    m.client, m.clientErr = clientManager.HookClient(hookClientConfigForWebhook(m))
  })
  return m.client, m.clientErr
}
代码语言:javascript
复制

ClientManager的HookClient方法如下:

代码语言:javascript
复制
// HookClient 从缓存中获取一个 RESTClient,或者根据 webhook 配置构造一个。
func (cm *ClientManager) HookClient(cc ClientConfig) (*rest.RESTClient, error) {
  ccWithNoName := cc
  ccWithNoName.Name = ""
  cacheKey, err := json.Marshal(ccWithNoName)
  if err != nil {
    return nil, err
  }
  if client, ok := cm.cache.Get(string(cacheKey)); ok {
    return client.(*rest.RESTClient), nil
  }

  complete := func(cfg *rest.Config) (*rest.RESTClient, error) {
    //避免与 webhook 后端通信的客户端速率限制。在决定服务多少请求时应该进行速率限制。
    cfg.QPS = -1

    //...省略...

    cfg.ContentConfig.NegotiatedSerializer = cm.negotiatedSerializer
    cfg.ContentConfig.ContentType = runtime.ContentTypeJSON
    client, err := rest.UnversionedRESTClientFor(cfg)
    if err == nil {
      cm.cache.Add(string(cacheKey), client)
    }
    return client, err
  }

  if cc.Service != nil {
    port := cc.Service.Port
    if port == 0 {
      // Default to port 443 if no service port is specified
      port = 443
    }

    restConfig, err := cm.authInfoResolver.ClientConfigForService(cc.Service.Name, cc.Service.Namespace, int(port))
    if err != nil {
      return nil, err
    }
    cfg := rest.CopyConfig(restConfig)
    serverName := cc.Service.Name + "." + cc.Service.Namespace + ".svc"

    host := net.JoinHostPort(serverName, strconv.Itoa(int(port)))
    cfg.Host = "https://" + host
    cfg.APIPath = cc.Service.Path
    // Set the server name if not already set
    if len(cfg.TLSClientConfig.ServerName) == 0 {
      cfg.TLSClientConfig.ServerName = serverName
    }

    delegateDialer := cfg.Dial
    if delegateDialer == nil {
      var d net.Dialer
      delegateDialer = d.DialContext
    }
    cfg.Dial = func(ctx context.Context, network, addr string) (net.Conn, error) {
      if addr == host {
        u, err := cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port)
        if err != nil {
          return nil, err
        }
        addr = u.Host
      }
      return delegateDialer(ctx, network, addr)
    }

    return complete(cfg)
  }

  if cc.URL == "" {
    return nil, &ErrCallingWebhook{WebhookName: cc.Name, Reason: errors.New("webhook configuration must have either service or URL")}
  }

  u, err := url.Parse(cc.URL)
  if err != nil {
    return nil, &ErrCallingWebhook{WebhookName: cc.Name, Reason: fmt.Errorf("Unparsable URL: %v", err)}
  }

  hostPort := u.Host
  if len(u.Port()) == 0 {
    // Default to port 443 if no port is specified
    hostPort = net.JoinHostPort(hostPort, "443")
  }

  restConfig, err := cm.authInfoResolver.ClientConfigFor(hostPort)
  if err != nil {
    return nil, err
  }

  cfg := rest.CopyConfig(restConfig)
  cfg.Host = u.Scheme + "://" + u.Host
  cfg.APIPath = u.Path

  return complete(cfg)
}
代码语言:javascript
复制

这个方法作用是生成webhook client。首先构建restConfig,

其中包含了参数,cfg.Host、cfg.APIPath、cfg.Dial等,这些参数最终会调用complete函数中rest.UnversionedRESTClientFor(cfg)。

其中确定且需关注的点为:

url:

https://rocketmq-operator-webhook-service.rocketmq.svc:443

ApiPath:

/mutate-rocketmq-apache-org-v1alpha1-rocketmqcluster

Dial需重点关注,这里赋值为以下函数:

代码语言:javascript
复制
func(ctx context.Context, network, addr string) (net.Conn, error) {
      if addr == host {
        u, err := cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port)
        if err != nil {
          return nil, err
        }
        addr = u.Host
      }
      return delegateDialer(ctx, network, addr)
}
代码语言:javascript
复制

这里的cfg.Dial最终会赋值给http.Transport的DialContext,

代码在

kubernetes/staging/src/k8s.io/client-go/transport/cache.go中tlsTransportCache的get方法:

代码语言:javascript
复制

func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
  key, canCache, err := tlsConfigKey(config)
  if err != nil {
    return nil, err
  }

  //...省略...

  // Get the TLS options for this client config
  tlsConfig, err := TLSConfigFor(config)
  if err != nil {
    return nil, err
  }
  //...省略...
  dial := config.Dial
  if dial == nil {
    dial = (&net.Dialer{
      Timeout:   30 * time.Second,
      KeepAlive: 30 * time.Second,
    }).DialContext
  }
    
  //...省略...
  transport := utilnet.SetTransportDefaults(&http.Transport{
    Proxy:               http.ProxyFromEnvironment,
    TLSHandshakeTimeout: 10 * time.Second,
    TLSClientConfig:     tlsConfig,
    MaxIdleConnsPerHost: idleConnsPerHost,
    DialContext:         dial,
    DisableCompression:  config.DisableCompression,
  })

  if canCache {
    // Cache a single transport for these options
    c.transports[key] = transport
  }

  return transport, nil
}

上面重点关注的cfg.Dial函数中:

代码语言:javascript
复制
cm.serviceResolver.ResolveEndpoint(cc.Service.Namespace, cc.Service.Name, port)

serviceResolver实现为loopbackResolver,

代码在

kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/resolvers.go中:

代码语言:javascript
复制
type loopbackResolver struct {
  delegate ServiceResolver
  host     *url.URL
}

func (r *loopbackResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
  if namespace == "default" && name == "kubernetes" && port == 443 {
    return r.host, nil
  }
  return r.delegate.ResolveEndpoint(namespace, name, port)
}

在启动参数设置了--enable-aggregator-routing=true时,

delegate又去调用实现为:

aggregatorEndpointRouting,位于

kubernetes/staging/src/k8s.io/kube-aggregator/pkg/apiserver/resolvers.go中

代码语言:javascript
复制
type aggregatorEndpointRouting struct {
  services  listersv1.ServiceLister
  endpoints listersv1.EndpointsLister
}

func (r *aggregatorEndpointRouting) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
  return proxy.ResolveEndpoint(r.services, r.endpoints, namespace, name, port)
}

proxy.ResolveEndpoint如下:

代码语言:javascript
复制
代码语言:javascript
复制
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string, port int32) (*url.URL, error) {
  svc, err := services.Services(namespace).Get(id)
  if err != nil {
    return nil, err
  }

  svcPort, err := findServicePort(svc, port)
  if err != nil {
    return nil, err
  }

  switch {
  case svc.Spec.Type == v1.ServiceTypeClusterIP, svc.Spec.Type == v1.ServiceTypeLoadBalancer, svc.Spec.Type == v1.ServiceTypeNodePort:
    // these are fine
  default:
    return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type)
  }

  eps, err := endpoints.Endpoints(namespace).Get(svc.Name)
  if err != nil {
    return nil, err
  }
  if len(eps.Subsets) == 0 {
    return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svc.Name))
  }

  // Pick a random Subset to start searching from.
  ssSeed := rand.Intn(len(eps.Subsets))

  // Find a Subset that has the port.
  for ssi := 0; ssi < len(eps.Subsets); ssi++ {
    ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)]
    if len(ss.Addresses) == 0 {
      continue
    }
    for i := range ss.Ports {
      if ss.Ports[i].Name == svcPort.Name {
        // Pick a random address.
        ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP
        port := int(ss.Ports[i].Port)
        return &url.URL{
          Scheme: "https",
          Host:   net.JoinHostPort(ip, strconv.Itoa(port)),
        }, nil
      }
    }
  }
  return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", id))
}

可以看到这里在lister缓存中获取到svc及对应的endpoint对象,返回https://podip:443。

总结

具体golang的http.Transport中的DialContext作用,这里不展开。

具体为什么实现为loopbackResolver->aggregatorEndpointRouting,会在后面webhook源码分析中专门介绍。

这里只定位APIServer webhook调用时,可以直接获取webhook podip地址,而不需要去coredns做解析,这在一定程度上做到解耦合(不依赖coredns)。

其实不止webhook,APIService(聚合API)也遵循上面的过程。

即使APIServer为静态Pod方式,由kubelet管理,创建出来的mirror Pod的spec.dnsPolicy依然为ClusterFirst,而Pod为hostNetwork网络,即Pod中的/etc/resolv.conf继承自主机,不经过coredns解析。

以上讨论的前提是webhook的clientConfig是service指定。如果是用url指定的,还需另当别论。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-07-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 进击云原生 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • webhook示例
  • 源码分析
  • 总结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档