前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >mac 上学习k8s系列(15)kube-apiserver源码阅读

mac 上学习k8s系列(15)kube-apiserver源码阅读

作者头像
golangLeetcode
发布2022-08-02 19:30:12
3030
发布2022-08-02 19:30:12
举报
文章被收录于专栏:golang算法架构leetcode技术php

kube-apiserver目录下文件比较多

代码语言:javascript
复制
.
|____app
| |____options
| | |____globalflags_test.go
| | |____options.go
| | |____globalflags.go
| | |____globalflags_providers.go
| | |____globalflags_providerless.go
| | |____options_test.go
| | |____validation.go
| | |____validation_test.go
| |____server.go
| |____aggregator.go
| |____testing
| | |____testdata
| | | |____127.0.0.1_10.0.0.1_kubernetes.default.svc-kubernetes.default-kubernetes-localhost.key
| | | |____README.md
| | | |____127.0.0.1_10.0.0.1_kubernetes.default.svc-kubernetes.default-kubernetes-localhost.crt
| | |____testserver.go
| |____apiextensions.go
| |____server_test.go
|____apiserver.go
|____OWNERS

入口文件仅仅是一个封装cmd/kube-apiserver/apiserver.go

代码语言:javascript
复制
command := app.NewAPIServerCommand()
command.Execute()

调用的是cmd/kube-apiserver/app/server.go里面的cobra.Command ,先创建对象,注册方法,和kubectl 一个套路:

代码语言:javascript
复制
func NewAPIServerCommand() *cobra.Command 
    s := options.NewServerRunOptions()
    err := checkNonZeroInsecurePort(fs)
    completedOptions, err := Complete(s)
     completedOptions.Validate()
     Run(completedOptions, genericapiserver.SetupSignalHandler())

在complete中校验了权限

代码语言:javascript
复制
  apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, err := getServiceIPAndRanges(s.ServiceClusterIPRanges)
  s.Authentication.ApplyAuthorization(s.Authorization)

然后run

代码语言:javascript
复制
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error 
    server, err := CreateServerChain(completeOptions, stopCh)
    prepared, err := server.PrepareRun()
    prepared.Run(stopCh)

重点看下CreateServerChain:

代码语言:javascript
复制
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)

其中proxyTransport是一个httpTransport

代码语言:javascript
复制
proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
    DialContext:     proxyDialerFn,
    TLSClientConfig: proxyTLSClientConfig,
  })

然后创建了kubeAPIServer,创建server,包括扩展的apiserver和原生的apiserver,调用方法为createAPIExtensionsServer和CreateKubeAPIServer

代码语言:javascript
复制
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)

PrepareRun 主要是加了一些hook方法:

代码语言:javascript
复制
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
    go s.openAPIAggregationController.Run(context.StopCh)
    return nil
    })
s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)

同样apiserver的实现也在vendor里包装了下 vendor/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

代码语言:javascript
复制
func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error

创建Handler的方法实现在pkg/controlplane/instance.go

代码语言:javascript
复制
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error)
  routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
  Install(s.Handler.GoRestfulContainer)
  m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider);
  restStorageProviders := []RESTStorageProvider
  m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...);

InstallLegacyAPI里面注册了路由

代码语言:javascript
复制
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo);

具体实现位置是pkg/registry/core/rest/storage_core.go

代码语言:javascript
复制
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error)
    restStorageMap := map[string]rest.Storage{
      "pods":             podStorage.Pod,
      "pods/attach":      podStorage.Attach,
      "pods/status":      podStorage.Status,
      "pods/log":         podStorage.Log,
      "pods/exec":        podStorage.Exec,
      "pods/portforward": podStorage.PortForward,
      "pods/proxy":       podStorage.Proxy,
      "pods/binding":     podStorage.Binding,
      "bindings":         podStorage.LegacyBinding,
      "podTemplates": podTemplateStorage,
      "replicationControllers":        controllerStorage.Controller,
      "replicationControllers/status": controllerStorage.Status,
      "services":        serviceRest,
      "services/proxy":  serviceRestProxy,
      "services/status": serviceStatusStorage,
      "endpoints": endpointsStorage,
      "nodes":        nodeStorage.Node,
      "nodes/status": nodeStorage.Status,
      "nodes/proxy":  nodeStorage.Proxy,
      "events": eventStorage,
      "limitRanges":                   limitRangeStorage,
      "resourceQuotas":                resourceQuotaStorage,
      "resourceQuotas/status":         resourceQuotaStatusStorage,
      "namespaces":                    namespaceStorage,
      "namespaces/status":             namespaceStatusStorage,
      "namespaces/finalize":           namespaceFinalizeStorage,
      "secrets":                       secretStorage,
      "serviceAccounts":               serviceAccountStorage,
      "persistentVolumes":             persistentVolumeStorage,
      "persistentVolumes/status":      persistentVolumeStatusStorage,
      "persistentVolumeClaims":        persistentVolumeClaimStorage,
      "persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
      "configMaps":                    configMapStorage,
      "componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
    restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
    restStorageMap["pods/eviction"] = podStorage.Eviction
    restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
    restStorageMap["pods/ephemeralcontainers"] = podStorage.EphemeralContainers

将各个handler的路由方法注册到Container中去,完全遵循go-restful的设计模式,即将处理方法注册到Route中去,同一个根路径下的Route注册到WebService中去,WebService注册到Container中,Container负责分发。访问的过程为Container-->WebService-->Route。可以看到我们常用的接口,需要获取的资源路由都定义在这里。这里只是定义了一个map,真正注册成路由的地方在这里:

vendor/k8s.io/apiserver/pkg/server/genericapiserver.go

代码语言:javascript
复制
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error
    s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels);
    r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
    apiResources, resourceInfos, ws, registrationErrors := installer.Install()
    for _, path := range paths {
        apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)

上面路由的map其实定义了一个路径到对应storage的映射,以pod为例看下storage的实现:pkg/registry/core/pod/storage/storage.go

代码语言:javascript
复制
type PodStorage struct {
  Pod                 *REST
  Binding             *BindingREST
  LegacyBinding       *LegacyBindingREST
  Eviction            *EvictionREST
  Status              *StatusREST
  EphemeralContainers *EphemeralContainersREST
  Log                 *podrest.LogREST
  Proxy               *podrest.ProxyREST
  Exec                *podrest.ExecREST
  Attach              *podrest.AttachREST
  PortForward         *podrest.PortForwardREST
}

func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) 
      Pod:  &REST{store, proxyTransport},

其中pod是一个REST类型的对象,REST的定义如下:

代码语言:javascript
复制
type REST struct {
  *genericregistry.Store
  proxyTransport http.RoundTripper
}
func (r *REST) ResourceLocation(ctx context.Context, name string) (*url.URL, http.RoundTripper, error)
  return registrypod.ResourceLocation(ctx, r, r.proxyTransport, name)

ResourceLocation定义在pkg/registry/core/pod/strategy.go

代码语言:javascript
复制
func ResourceLocation(ctx context.Context, getter ResourceGetter, rt http.RoundTripper, id string) (*url.URL, http.RoundTripper, error) 
    pod, err := getPod(ctx, getter, name)
    podIP := getPodIP(pod)
     err := proxyutil.IsProxyableIP(podIP);
代码语言:javascript
复制
func getPod(ctx context.Context, getter ResourceGetter, name string) (*api.Pod, error) 
    obj, err := getter.Get(ctx, name, &metav1.GetOptions{})
    pod := obj.(*api.Pod)

其中的Get方法,是实现了REST的一个接口

pkg/registry/core/namespace/storage/storage.go

代码语言:javascript
复制
func (r *REST) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
  return r.store.Get(ctx, name, options)
}

type REST struct {
  store  *genericregistry.Store
  status *genericregistry.Store
}

最终调用的是store的GET,从存储,也就是etcd里面读取元数据返回给用户。vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go

代码语言:javascript
复制
func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) 
    err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj);

type Store struct {
  Storage DryRunnableStorage
  // Storage is the interface for the underlying storage for the
  // resource. It is wrapped into a "DryRunnableStorage" that will
  // either pass-through or simply dry-run.
}

vendor/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go

代码语言:javascript
复制
func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error
    return s.Storage.Get(ctx, key, opts, objPtr)

apiserver本质上就是一个server服务器,所有代码核心就是如何配置server,包括路由、访问权限以及同数据库(etcd)的交互等。

最终调用etcd的路径是:staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go

代码语言:javascript
复制
type store struct {
  client              *clientv3.Client
  codec               runtime.Codec
  versioner           storage.Versioner
  transformer         value.Transformer
  pathPrefix          string
  groupResource       schema.GroupResource
  groupResourceString string
  watcher             *watcher
  pagingEnabled       bool
  leaseManager        *leaseManager
}
代码语言:javascript
复制
func New(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface {
  return newStore(c, codec, newFunc, prefix, groupResource, transformer, pagingEnabled, leaseManagerConfig)
}

对应的Get方法如下

代码语言:javascript
复制
func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
  key = path.Join(s.pathPrefix, key)
  startTime := time.Now()
  getResp, err := s.client.KV.Get(ctx, key)
  metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
  if err != nil {
    return err
  }
  if err = s.validateMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
    return err
  }

  if len(getResp.Kvs) == 0 {
    if opts.IgnoreNotFound {
      return runtime.SetZeroValue(out)
    }
    return storage.NewKeyNotFoundError(key, 0)
  }
  kv := getResp.Kvs[0]

  data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
  if err != nil {
    return storage.NewInternalError(err.Error())
  }

  return decode(s.codec, s.versioner, data, out, kv.ModRevision)
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-10-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档