前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >详述 TKEStack 对 Kubernetes Aggregated APIServer 的最佳实践

详述 TKEStack 对 Kubernetes Aggregated APIServer 的最佳实践

作者头像
腾讯云原生
发布2020-11-16 10:10:27
1.1K0
发布2020-11-16 10:10:27
举报

作者杜杨浩,腾讯云高级工程师,热衷于开源、容器和Kubernetes。目前主要从事边缘计算以及云原生架构相关研发工作。

本文基于腾讯云容器服务开源版TKEStack v1.1.0版本,介绍了TKEStack基于Kubernetes Aggregated APIServer以及Controller的设计模式,并以business模块作为例子进行了详细说明。总的来说,开源项目TKEStack代码精炼且优雅,不失为Kubernetes AA的最佳实践。

前言

TKEStack是腾讯Kubernetes-native开源版容器管理平台,其整体架构设计如下所示:

本文主要针对Global Cluster的代码架构设计进行分析……

TKEStack Global Cluster

TKEStack Global cluster采用Kubernetes标准的Aggregated APIServer设计模式,目录结构如下所示:

代码语言:javascript
复制
├── cmd
│   ├── generate-images
│   │   └── main.go
│   ├── tke-auth-api
│   │   ├── app
│   │   └── auth.go
│   ├── tke-auth-controller
│   │   ├── app
│   │   └── controller-manager.go
│   ├── tke-business-api
│   │   ├── apiserver.go
│   │   └── app
│   ├── tke-business-controller
│   │   ├── app
│   │   └── controller-manager.go
│   ├── tke-gateway
│   │   ├── app
│   │   └── gateway.go
│   ├── tke-installer
│   │   ├── app
│   │   ├── assets
│   │   └── installer.go
│   ├── tke-monitor-api
│   │   ├── apiserver.go
│   │   └── app
│   ├── tke-monitor-controller
│   │   ├── app
│   │   └── controller-manager.go
│   ├── tke-notify-api
│   │   ├── apiserver.go
│   │   └── app
│   ├── tke-notify-controller
│   │   ├── app
│   │   └── controller-manager.go
│   ├── tke-platform-api
│   │   ├── apiserver.go
│   │   └── app
│   ├── tke-platform-controller
│   │   ├── app
│   │   └── controller-manager.go
│   └── tke-registry-api
│       ├── apiserver.go
│       └── app
├── pkg
│   ├── auth
│   ├── business
│   │   ├── apiserver
│   │   ├── controller
│   │   ├── registry
│   │   └── util
│   ├── gateway
│   ├── monitor
│   ├── notify
│   ├── platform
│   ├── registry

可以看到TKEStack将Global Cluster分成了若干个模块:gateway、registry、business等。每个模块对应TKEStack的若干自定义资源类型,例如这里的business对应TKEStack资源ProjectNamespacePortal。各模块均采用Aggregated APIServer和相应的Controller进行处理,这个会在后续进行分析。这里我们先依次介绍一下Kubernetes扩展工作负载的几个概念:

  • Custom Resource
  • Controller
  • Aggregated APIServer
  • Operator

Custom Resource

A resource is an endpoint in the Kubernetes API that stores a collection of API objects of a certain kind. For example, the built-in pods resource contains a collection of Pod objects. A custom resource is an extension of the Kubernetes API that is not necessarily available in a default Kubernetes installation. It represents a customization of a particular Kubernetes installation. However, many core Kubernetes functions are now built using custom resources, making Kubernetes more modular. Custom resources can appear and disappear in a running cluster through dynamic registration, and cluster admins can update custom resources independently of the cluster itself. Once a custom resource is installed, users can create and access its objects using kubectl, just as they do for built-in resources like Pods.

Custom Resource,简称CR,是Kubernetes自定义资源类型,与之相对应的就是Kubernetes内置的各种资源类型,例如Pod、Service等。利用CR我们可以定义任何想要的资源类型,例如这里TKEStack的Project

而对于如何使用CR,官方也给出了两种方式:

Kubernetes provides two ways to add custom resources to your cluster: CRDs are simple and can be created without any programming. API Aggregation requires programming, but allows more control over API behaviors like how data is stored and conversion between API versions. Kubernetes provides these two options to meet the needs of different users, so that neither ease of use nor flexibility is compromised. Aggregated APIs are subordinate APIServers that sit behind the primary API server, which acts as a proxy. This arrangement is called API Aggregation (AA). To users, it simply appears that the Kubernetes API is extended. CRDs allow users to create new types of resources without adding another APIserver. You do not need to understand API Aggregation to use CRDs. Regardless of how they are installed, the new resources are referred to as Custom Resources to distinguish them from built-in Kubernetes resources (like pods).

也即Aggregated APIServer和CRDs,这两种方式各有优缺点,适用场景也不相同,如下:

  • CRD更简单,不需要programming,更加轻量级;相比AA则需要专门programming并维护
  • Aggregated APIServer更加灵活,可以完成很多CRD不具备的事情,例如:对存储层的CRUD定制化操作

详细比较可以参考kubernetes-sigs/apiserver-builder-alpha(https://github.com/kubernetes-sigs/apiserver-builder-alpha/blob/master/docs/compare_with_kubebuilder.md)。

Controller

无论是采用Aggregated APIServer还是CRDs,都必须使用Controller。在Kubernetes中,Controller是一个控制循环,它通过apiserver监视集群的共享状态,并进行更改以尝试将当前状态移向所需状态(Reconcile Loop):

代码语言:javascript
复制
for {
  if current != desired {
    change(current, desired)
  }
}

而对于custom controller,原理和Kubernetes Controller Manager一致,如下:

详情参考sample controller(https://github.com/kubernetes/sample-controller)。

Aggregated APIServer

Aggregated APIServer Motivation

  • Extensibility: We want to allow community members to write their own API servers to expose APIs they want. Cluster admins should be able to use these servers without having to require any change in the core kubernetes repository.
  • Unblock new APIs from core kubernetes team review: A lot of new API proposals are currently blocked on review from the core kubernetes team. By allowing developers to expose their APIs as a separate server and enabling the cluster admin to use it without any change to the core kubernetes repository, we unblock these APIs.
  • Place for staging experimental APIs: New APIs can be developed in separate aggregated servers, and installed only by those willing to take the risk of installing an experimental API. Once they are stable, it is then easy to package them up for installation in other clusters.
  • Ensure that new APIs follow kubernetes conventions: Without the mechanism proposed here, community members might be forced to roll their own thing which may or may not follow kubernetes conventions.

Aggregated APIServer(简称AA)是Kubernetes提出的用于客户定制API需求的解决方案,也是Kubernetes扩展工作负载的一种方式。利用AA我们可以用Kubernetes-native的方式对CR做任何事情,其中最基本的就是存储层的CRUD操作了

AA通过向Kubernetes注册CR的方式来实现api-resource,如下是apiserver-builder-alpha对AA的工作流解释:

流程如下:

  • 1、对于CR的API请求,先发送给Kubernetes core APIServer,代理给AA
  • 2、AA接受请求,然后操作etcd,这个etcd可以与Kubernetes etcd共用,也可以单独部署一套
  • 3、Custom Controller Watch core APIServer CR资源变化(注意这个时候Watch会代理给AA)
  • 4、如果CR发生变化,Custom Controller接受到变化对象和相应事件,并对该CR进行相关操作,可以是CR本身,也可以是CR所关联的Kubernetes原生资源类型,例如Pod、Deployment等
  • 5、如果是CR本身的CRUD操作,则走core APIServer,然后代理给AA;否则直接走core APIServer

Operator

CRDs是扩展Kubernetes工作负载的第二种方式,也是相对简单的一种:

The CustomResourceDefinition API resource allows you to define custom resources. Defining a CRD object creates a new custom resource with a name and schema that you specify. The Kubernetes API serves and handles the storage of your custom resource. This frees you from writing your own API server to handle the custom resource, but the generic nature of the implementation means you have less flexibility than with API server aggregation

CRD通过yaml文件的形式向Kubernetes注册CR实现api-resource,用户无需额外编写AA,但是需要维护一个Custom Controller以处理CR变化,Kubernetes将这种方式称为Operator,也即:Operator=CRD+Controller。

TKEStack business Aggregated APIServer

在介绍完上述概念后,我们回到主题TKEStack,由于TKEStack各模块均采用AA方式(Aggregated APIServer+Custom Controller)编写,这里我们针对其中的business模块进行详细分析

TKEStack business模块整合了Project、Namespace、Platform等资源的处理:

代码语言:javascript
复制
├── cmd
│   ├── tke-business-api
│   │   ├── apiserver.go
│   │   └── app
│   ├── tke-business-controller
│   │   ├── app
│   │   └── controller-manager.go
├── pkg
    ├── business
        ├── apiserver
        │   ├── apiserver.go
        │   └── install.go
        ├── controller
        │   ├── chartgroup
        │   ├── imagenamespace
        │   ├── namespace
        │   └── project
        ├── registry
        │   ├── chartgroup
        │   ├── configmap
        │   ├── imagenamespace
        │   ├── namespace
        │   ├── platform
        │   ├── portal
        │   ├── project
        │   └── rest
        └── util
            ├── filter.go
            ├── label.go
            └── resource.go

启动TKEStack business Aggregated APIServer(github.com/tkestack/tke/cmd/tke-business-api/app/server.go:28):

代码语言:javascript
复制
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(cfg *config.Config) (*genericapiserver.GenericAPIServer, error) {
	apiServerConfig := createAPIServerConfig(cfg)
	apiServer, err := CreateAPIServer(apiServerConfig, genericapiserver.NewEmptyDelegate())
	if err != nil {
		return nil, err
	}

	apiServer.GenericAPIServer.AddPostStartHookOrDie("start-business-api-server-informers", func(context genericapiserver.PostStartHookContext) error {
		cfg.VersionedSharedInformerFactory.Start(context.StopCh)
		return nil
	})

	return apiServer.GenericAPIServer, nil
}

// CreateAPIServer creates and wires a workable tke-business-api
func CreateAPIServer(apiServerConfig *apiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiserver.APIServer, error) {
	return apiServerConfig.Complete().New(delegateAPIServer)
}

func createAPIServerConfig(cfg *config.Config) *apiserver.Config {
	return &apiserver.Config{
		GenericConfig: &genericapiserver.RecommendedConfig{
			Config: *cfg.GenericAPIServerConfig,
		},
		ExtraConfig: apiserver.ExtraConfig{
			ServerName:              cfg.ServerName,
			VersionedInformers:      cfg.VersionedSharedInformerFactory,
			StorageFactory:          cfg.StorageFactory,
			APIResourceConfigSource: cfg.StorageFactory.APIResourceConfigSource,
			PlatformClient:          cfg.PlatformClient,
			RegistryClient:          cfg.RegistryClient,
			PrivilegedUsername:      cfg.PrivilegedUsername,
			FeatureOptions:          cfg.FeatureOptions,
		},
	}
}
...
// New returns a new instance of APIServer from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*APIServer, error) {
	s, err := c.GenericConfig.New(c.ExtraConfig.ServerName, delegationTarget)
	if err != nil {
		return nil, err
	}

	m := &APIServer{
		GenericAPIServer: s,
	}

	// The order here is preserved in discovery.
	restStorageProviders := []storage.RESTStorageProvider{
		&businessrest.StorageProvider{
			LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
			PlatformClient:       c.ExtraConfig.PlatformClient,
			RegistryClient:       c.ExtraConfig.RegistryClient,
			PrivilegedUsername:   c.ExtraConfig.PrivilegedUsername,
			Features:             c.ExtraConfig.FeatureOptions,
		},
	}
	m.InstallAPIs(c.ExtraConfig, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)
	m.GenericAPIServer.AddPostStartHookOrDie("default-administrator", c.postStartHookFunc())

	return m, nil
}

注册CR(github.com/tkestack/tke/pkg/business/apiserver/apiserver.go:113):

代码语言:javascript
复制
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *APIServer) InstallAPIs(extraConfig *ExtraConfig, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...storage.RESTStorageProvider) {
	var apiGroupsInfo []genericapiserver.APIGroupInfo

	for _, restStorageBuilder := range restStorageProviders {
		groupName := restStorageBuilder.GroupName()
		if !extraConfig.APIResourceConfigSource.AnyVersionForGroupEnabled(groupName) {
			log.Infof("Skipping disabled API group %q.", groupName)
			continue
		}
		apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(extraConfig.APIResourceConfigSource, restOptionsGetter)
		if !enabled {
			log.Warnf("Problem initializing API group %q, skipping.", groupName)
			continue
		}
		log.Infof("Enabling API group %q.", groupName)

		if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
			name, hook, err := postHookProvider.PostStartHook()
			if err != nil {
				log.Fatalf("Error building PostStartHook: %v", err)
			}
			m.GenericAPIServer.AddPostStartHookOrDie(name, hook)
		}

		apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
	}

	for i := range apiGroupsInfo {
		if err := m.GenericAPIServer.InstallAPIGroup(&apiGroupsInfo[i]); err != nil {
			log.Fatalf("Error in registering group versions: %v", err)
		}
	}
}
 
...
// github.com/tkestack/tke/pkg/business/registry/rest/rest.go:73
func (s *StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource,
	restOptionsGetter generic.RESTOptionsGetter, loopbackClientConfig *restclient.Config,
	features *options.FeatureOptions) map[string]rest.Storage {
	businessClient := businessinternalclient.NewForConfigOrDie(loopbackClientConfig)

	storageMap := make(map[string]rest.Storage)
	{
		projectREST := projectstorage.NewStorage(restOptionsGetter, businessClient, s.PlatformClient, s.PrivilegedUsername, features)
		storageMap["projects"] = projectREST.Project
		storageMap["projects/status"] = projectREST.Status
		storageMap["projects/finalize"] = projectREST.Finalize

		namespaceREST := namespacestorage.NewStorage(restOptionsGetter, businessClient, s.PlatformClient, s.PrivilegedUsername)
		storageMap["namespaces"] = namespaceREST.Namespace
		storageMap["namespaces/status"] = namespaceREST.Status

		platformREST := platformstorage.NewStorage(restOptionsGetter, businessClient, s.PrivilegedUsername)
		storageMap["platforms"] = platformREST.Platform

		portalREST := portalstorage.NewStorage(restOptionsGetter, businessClient)
		storageMap["portal"] = portalREST.Portal

		configMapREST := configmapstorage.NewStorage(restOptionsGetter)
		storageMap["configmaps"] = configMapREST.ConfigMap

		if s.RegistryClient != nil {
			imageNamespaceREST := imagenamespacestorage.NewStorage(restOptionsGetter, businessClient, s.RegistryClient, s.PrivilegedUsername)
			storageMap["imagenamespaces"] = imageNamespaceREST.ImageNamespace
			storageMap["imagenamespaces/status"] = imageNamespaceREST.Status
			storageMap["imagenamespaces/finalize"] = imageNamespaceREST.Finalize

			chartGroupREST := chartgroupstorage.NewStorage(restOptionsGetter, businessClient, s.RegistryClient, s.PrivilegedUsername)
			storageMap["chartgroups"] = chartGroupREST.ChartGroup
			storageMap["chartgroups/status"] = chartGroupREST.Status
			storageMap["chartgroups/finalize"] = chartGroupREST.Finalize
		}
	}

	return storageMap
}

business中相应的AA处理逻辑都在pkg/business/registry目录中,如下:

代码语言:javascript
复制
pkg/business/registry
├── chartgroup
│   ├── storage
│   │   └── storage.go
│   ├── strategy.go
│   └── validation.go
├── configmap
│   ├── storage
│   │   └── storage.go
│   ├── strategy.go
│   └── validation.go
├── imagenamespace
│   ├── storage
│   │   └── storage.go
│   ├── strategy.go
│   └── validation.go
├── namespace
│   ├── storage
│   │   └── storage.go
│   ├── strategy.go
│   ├── validation.go
│   └── validation_test.go
├── platform
│   ├── storage
│   │   └── storage.go
│   ├── strategy.go
│   └── validation.go
├── portal
│   └── storage
│       └── storage.go
├── project
│   ├── storage
│   │   └── storage.go
│   ├── strategy.go
│   ├── validation.go
│   └── validation_test.go
└── rest
    └── rest.go

而对应的Controller处理逻辑都在pkg/controller中,如下:

代码语言:javascript
复制
pkg/business/controller
├── chartgroup
│   ├── chartgroup_cache.go
│   ├── chartgroup_controller.go
│   ├── chartgroup_health.go
│   └── deletion
│       └── chartgroup_resources_deleter.go
├── imagenamespace
│   ├── deletion
│   │   └── imagenamespace_resources_deleter.go
│   ├── imagenamespace_cache.go
│   ├── imagenamespace_controller.go
│   └── imagenamespace_health.go
├── namespace
│   ├── cluster.go
│   ├── deletion
│   │   └── namespaced_resources_deleter.go
│   ├── namespace_cache.go
│   ├── namespace_controller.go
│   └── namespace_health.go
└── project
    ├── deletion
    │   └── projected_resources_deleter.go
    ├── project_cache.go
    └── project_controller.go

其中AA采用sample-apiserver编写,Custom Controller采用sample-controller编写。这里我们以创建一个TKEStack Namespace为例分析Namespace AA和Namespace Controller细节:

1、创建TKEStack Namespace:

代码语言:javascript
复制
namespace1 := &businessv1.Namespace{
	ObjectMeta: metav1.ObjectMeta{
		Name:      "project1-namespace1",
		Namespace: "project1",
	},
	Spec: businessv1.NamespaceSpec{
		ClusterName: "cluster1",
		Hard: businessv1.ResourceList{
			"cpu":    resource.MustParse("300m"),
			"memory": resource.MustParse("450Mi"),
		},
		Namespace: "project1",
	},
	Status: businessv1.NamespaceStatus{
		Used: businessv1.ResourceList{
			"cpu":    resource.MustParse("200m"),
			"memory": resource.MustParse("300Mi"),
		},
	},
}
_, _ = businessClient.BusinessV1().Namespaces("project1").Create(namespace1)

2、Namespace AA接受到请求,并对Namespace对象进行预处理、有效性检查以及ETCD写操作,如下(github.com/tkestack/tke/pkg/business/registry/namespace/storage/storage.go:53):

代码语言:javascript
复制
// NewStorage returns a Storage object that will work against namespace sets.
func NewStorage(optsGetter genericregistry.RESTOptionsGetter, businessClient *businessinternalclient.BusinessClient, platformClient platformversionedclient.PlatformV1Interface, privilegedUsername string) *Storage {
	strategy := namespace.NewStrategy(businessClient, platformClient)
	store := &registry.Store{
		NewFunc:                  func() runtime.Object { return &business.Namespace{} },
		NewListFunc:              func() runtime.Object { return &business.NamespaceList{} },
		DefaultQualifiedResource: business.Resource("namespaces"),
		PredicateFunc:            namespace.MatchNamespace,

		CreateStrategy: strategy,
		UpdateStrategy: strategy,
		DeleteStrategy: strategy,
		ExportStrategy: strategy,

		ShouldDeleteDuringUpdate: shouldDeleteDuringUpdate,
	}
	options := &genericregistry.StoreOptions{
		RESTOptions: optsGetter,
		AttrFunc:    namespace.GetAttrs,
	}

	if err := store.CompleteWithOptions(options); err != nil {
		log.Panic("Failed to create namespace etcd rest storage", log.Err(err))
	}

	statusStore := *store
	statusStore.UpdateStrategy = namespace.NewStatusStrategy(strategy)
	statusStore.ExportStrategy = namespace.NewStatusStrategy(strategy)

	finalizeStore := *store
	finalizeStore.UpdateStrategy = namespace.NewFinalizeStrategy(strategy)
	finalizeStore.ExportStrategy = namespace.NewFinalizeStrategy(strategy)

	return &Storage{
		Namespace: &REST{store, privilegedUsername},
		Status:    &StatusREST{&statusStore},
		Finalize:  &FinalizeREST{&finalizeStore},
	}
}
...
// Validate validates a new namespace.
func (s *Strategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList {
	return ValidateNamespace(obj.(*business.Namespace), nil,
		validation.NewObjectGetter(s.businessClient), validation.NewClusterGetter(s.platformClient))
}
...
// PrepareForCreate is invoked on create before validation to normalize
// the object.
func (s *Strategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {
	_, tenantID := authentication.GetUsernameAndTenantID(ctx)
	namespace, _ := obj.(*business.Namespace)
	if len(tenantID) != 0 {
		namespace.Spec.TenantID = tenantID
	}

	if namespace.Spec.ClusterName != "" && namespace.Spec.Namespace != "" {
		namespace.ObjectMeta.GenerateName = ""
		namespace.ObjectMeta.Name = fmt.Sprintf("%s-%s", namespace.Spec.ClusterName, namespace.Spec.Namespace)
	} else {
		namespace.ObjectMeta.GenerateName = "ns"
		namespace.ObjectMeta.Name = ""
	}

	namespace.Spec.Finalizers = []business.FinalizerName{
		business.NamespaceFinalize,
	}
}

3、Namespace Controller Watch到Namespace Add事件,对该事件进行处理如下(github.com/tkestack/tke/pkg/business/controller/namespace/namespace_controller.go:201):

代码语言:javascript
复制
// syncItem will sync the Namespace with the given key if it has had
// its expectations fulfilled, meaning it did not expect to see any more of its
// namespaces created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (c *Controller) syncItem(key string) error {
	startTime := time.Now()
	defer func() {
		log.Info("Finished syncing namespace", log.String("namespace", key), log.Duration("processTime", time.Since(startTime)))
	}()

	projectName, namespaceName, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}
	// Namespace holds the latest Namespace info from apiserver
	namespace, err := c.lister.Namespaces(projectName).Get(namespaceName)
	switch {
	case errors.IsNotFound(err):
		log.Info("Namespace has been deleted. Attempting to cleanup resources", log.String("projectName", projectName), log.String("namespaceName", namespaceName))
		err = c.processDeletion(key)
	case err != nil:
		log.Warn("Unable to retrieve namespace from store", log.String("projectName", projectName), log.String("namespaceName", namespaceName), log.Err(err))
	default:
		if namespace.Status.Phase == v1.NamespacePending || namespace.Status.Phase == v1.NamespaceAvailable || namespace.Status.Phase == v1.NamespaceFailed {
			cachedNamespace := c.cache.getOrCreate(key)
			err = c.processUpdate(cachedNamespace, namespace, key)
		} else if namespace.Status.Phase == v1.NamespaceTerminating {
			log.Info("Namespace has been terminated. Attempting to cleanup resources", log.String("projectName", projectName), log.String("namespaceName", namespaceName))
			_ = c.processDeletion(key)
			err = c.namespacedResourcesDeleter.Delete(projectName, namespaceName)
		} else {
			log.Debug(fmt.Sprintf("Namespace %s status is %s, not to process", key, namespace.Status.Phase))
		}
	}
	return err
}

由于默认是v1.NamespacePending状态,所以这里会跳转到processUpdate进行处理:

代码语言:javascript
复制
...
func (c *Controller) processUpdate(cachedNamespace *cachedNamespace, namespace *v1.Namespace, key string) error {
	if cachedNamespace.state != nil {
		// exist and the namespace name changed
		if cachedNamespace.state.UID != namespace.UID {
			if err := c.processDelete(cachedNamespace, key); err != nil {
				return err
			}
		}
	}
	// start update machine if needed
	err := c.handlePhase(key, cachedNamespace, namespace)
	if err != nil {
		return err
	}
	cachedNamespace.state = namespace
	// Always update the cache upon success.
	c.cache.set(key, cachedNamespace)
	return nil
}

func (c *Controller) handlePhase(key string, cachedNamespace *cachedNamespace, namespace *v1.Namespace) error {
	switch namespace.Status.Phase {
	case v1.NamespacePending:
		if err := c.calculateProjectUsed(cachedNamespace, namespace); err != nil {
			return err
		}
		if err := c.createNamespaceOnCluster(namespace); err != nil {
			namespace.Status.Phase = v1.NamespaceFailed
			namespace.Status.Message = "CreateNamespaceOnClusterFailed"
			namespace.Status.Reason = err.Error()
			namespace.Status.LastTransitionTime = metav1.Now()
			return c.persistUpdate(namespace)
		}
		namespace.Status.Phase = v1.NamespaceAvailable
		namespace.Status.Message = ""
		namespace.Status.Reason = ""
		namespace.Status.LastTransitionTime = metav1.Now()
		return c.persistUpdate(namespace)
	case v1.NamespaceAvailable, v1.NamespaceFailed:
		c.startNamespaceHealthCheck(key)
	}
	return nil
}

这里会先将Namespace的资源限制添加到对应的Project的资源限制,并更新Project,这样Project的资源配额就下发到Namespace中去了:

代码语言:javascript
复制
func (c *Controller) calculateProjectUsed(cachedNamespace *cachedNamespace, namespace *v1.Namespace) error {
	project, err := c.client.BusinessV1().Projects().Get(namespace.ObjectMeta.Namespace, metav1.GetOptions{})
	if err != nil {
		log.Error("Failed to get the project", log.String("projectName", namespace.ObjectMeta.Namespace), log.Err(err))
		return err
	}
	calculatedNamespaceNames := sets.NewString(project.Status.CalculatedNamespaces...)
	if !calculatedNamespaceNames.Has(project.ObjectMeta.Name) {
		project.Status.CalculatedNamespaces = append(project.Status.CalculatedNamespaces, namespace.ObjectMeta.Name)
		if project.Status.Clusters == nil {
			project.Status.Clusters = make(v1.ClusterUsed)
		}
		businessUtil.AddClusterHardToUsed(&project.Status.Clusters,
			v1.ClusterHard{
				namespace.Spec.ClusterName: v1.HardQuantity{
					Hard: namespace.Spec.Hard,
				},
			})
		return c.persistUpdateProject(project)
	}
	if cachedNamespace.state != nil && !reflect.DeepEqual(cachedNamespace.state.Spec.Hard, namespace.Spec.Hard) {
		if project.Status.Clusters == nil {
			project.Status.Clusters = make(v1.ClusterUsed)
		}
		// sub old
		businessUtil.SubClusterHardFromUsed(&project.Status.Clusters,
			v1.ClusterHard{
				namespace.Spec.ClusterName: v1.HardQuantity{
					Hard: cachedNamespace.state.Spec.Hard,
				},
			})
		// add new
		businessUtil.AddClusterHardToUsed(&project.Status.Clusters,
			v1.ClusterHard{
				namespace.Spec.ClusterName: v1.HardQuantity{
					Hard: namespace.Spec.Hard,
				},
			})
		return c.persistUpdateProject(project)
	}
	return nil
}
...
func (c *Controller) persistUpdateProject(project *v1.Project) error {
	var err error
	for i := 0; i < clientRetryCount; i++ {
		_, err = c.client.BusinessV1().Projects().UpdateStatus(project)
		if err == nil {
			return nil
		}
		if errors.IsNotFound(err) {
			log.Info("Not persisting update to projects that no longer exists", log.String("projectName", project.ObjectMeta.Name), log.Err(err))
			return nil
		}
		if errors.IsConflict(err) {
			return fmt.Errorf("not persisting update to projects '%s' that has been changed since we received it: %v", project.ObjectMeta.Name, err)
		}
		log.Warn(fmt.Sprintf("Failed to persist updated status of projects '%s/%s'", project.ObjectMeta.Name, project.Status.Phase), log.String("projectName", project.ObjectMeta.Name), log.Err(err))
		time.Sleep(clientRetryInterval)
	}
	return err
}

接着创建TKEStack Namespace对应的Kubernetes namespace以及相应的namespace ResourceQuotas,用于限制该namespace资源使用配额,如下:

代码语言:javascript
复制
func createNamespaceOnCluster(kubeClient *kubernetes.Clientset, namespace *v1.Namespace) error {
	ns, err := kubeClient.CoreV1().Namespaces().Get(namespace.Spec.Namespace, metav1.GetOptions{})
	if err != nil && errors.IsNotFound(err) {
		// create namespace
		nsOnCluster := &corev1.Namespace{
			ObjectMeta: metav1.ObjectMeta{
				Name: namespace.Spec.Namespace,
				Labels: map[string]string{
					util.LabelProjectName:   namespace.ObjectMeta.Namespace,
					util.LabelNamespaceName: namespace.ObjectMeta.Name,
				},
			},
		}
		_, err := kubeClient.CoreV1().Namespaces().Create(nsOnCluster)
		if err != nil {
			log.Error("Failed to create the namespace on cluster", log.String("namespaceName", namespace.ObjectMeta.Name), log.String("clusterName", namespace.Spec.ClusterName), log.Err(err))
			return err
		}
		return nil
	}
	if err != nil {
		log.Error("Failed to get the namespace on cluster", log.String("namespace", namespace.Spec.Namespace), log.String("namespaceName", namespace.ObjectMeta.Name), log.String("clusterName", namespace.Spec.ClusterName), log.Err(err))
		return err
	}
	projectName, ok := ns.ObjectMeta.Labels[util.LabelProjectName]
	if !ok {
		ns.Labels = make(map[string]string)
		ns.Labels[util.LabelProjectName] = namespace.ObjectMeta.Namespace
		ns.Labels[util.LabelNamespaceName] = namespace.ObjectMeta.Name
		_, err := kubeClient.CoreV1().Namespaces().Update(ns)
		if err != nil {
			log.Error("Failed to update the namespace on cluster", log.String("namespaceName", namespace.ObjectMeta.Name), log.String("clusterName", namespace.Spec.ClusterName), log.Err(err))
			return err
		}
		return nil
	}
	if projectName != namespace.ObjectMeta.Namespace {
		log.Error("The namespace in the cluster already belongs to another project and cannot be attributed to this project", log.String("clusterName", namespace.Spec.ClusterName), log.String("namespace", namespace.Spec.Namespace))
		return fmt.Errorf("namespace in the cluster already belongs to another project(%s) and cannot be attributed to this project(%s)", projectName, namespace.ObjectMeta.Namespace)
	}
	return nil
}
...
func createResourceQuotaOnCluster(kubeClient *kubernetes.Clientset, namespace *v1.Namespace) error {
	resourceQuota, err := kubeClient.CoreV1().ResourceQuotas(namespace.Spec.Namespace).Get(namespace.Spec.Namespace, metav1.GetOptions{})
	if err != nil && errors.IsNotFound(err) {
		// create resource quota
		resourceList := resource.ConvertToCoreV1ResourceList(namespace.Spec.Hard)
		rq := &corev1.ResourceQuota{
			ObjectMeta: metav1.ObjectMeta{
				Name:      namespace.Spec.Namespace,
				Namespace: namespace.Spec.Namespace,
			},
			Spec: corev1.ResourceQuotaSpec{
				Hard: resourceList,
			},
		}
		_, err := kubeClient.CoreV1().ResourceQuotas(namespace.Spec.Namespace).Create(rq)
		if err != nil {
			log.Error("Failed to create the resource quota on cluster", log.String("namespaceName", namespace.ObjectMeta.Name), log.String("clusterName", namespace.Spec.ClusterName), log.Err(err))
			return err
		}
		return nil
	}
	if err != nil {
		log.Error("Failed to get the resource quota on cluster", log.String("namespace", namespace.Spec.Namespace), log.String("namespaceName", namespace.ObjectMeta.Name), log.String("clusterName", namespace.Spec.ClusterName), log.Err(err))
		return err
	}
	resourceList := resource.ConvertToCoreV1ResourceList(namespace.Spec.Hard)
	if !reflect.DeepEqual(resourceQuota.Spec.Hard, resourceList) {
		resourceQuota.Spec.Hard = resourceList
		_, err := kubeClient.CoreV1().ResourceQuotas(namespace.Spec.Namespace).Update(resourceQuota)
		if err != nil {
			log.Error("Failed to update the resource quota on cluster", log.String("namespaceName", namespace.ObjectMeta.Name), log.String("clusterName", namespace.Spec.ClusterName), log.Err(err))
			return err
		}
	}
	return nil
}

最后更新TKEStack Namespace状态,如下:

代码语言:javascript
复制
func (c *Controller) handlePhase(key string, cachedNamespace *cachedNamespace, namespace *v1.Namespace) error {
	switch namespace.Status.Phase {
	case v1.NamespacePending:
		if err := c.calculateProjectUsed(cachedNamespace, namespace); err != nil {
			return err
		}
		if err := c.createNamespaceOnCluster(namespace); err != nil {
			namespace.Status.Phase = v1.NamespaceFailed
			namespace.Status.Message = "CreateNamespaceOnClusterFailed"
			namespace.Status.Reason = err.Error()
			namespace.Status.LastTransitionTime = metav1.Now()
			return c.persistUpdate(namespace)
		}
		namespace.Status.Phase = v1.NamespaceAvailable
		namespace.Status.Message = ""
		namespace.Status.Reason = ""
		namespace.Status.LastTransitionTime = metav1.Now()
		return c.persistUpdate(namespace)
	case v1.NamespaceAvailable, v1.NamespaceFailed:
		c.startNamespaceHealthCheck(key)
	}
	return nil
}
...
func (c *Controller) persistUpdate(namespace *v1.Namespace) error {
	var err error
	for i := 0; i < clientRetryCount; i++ {
		_, err = c.client.BusinessV1().Namespaces(namespace.ObjectMeta.Namespace).UpdateStatus(namespace)
		if err == nil {
			return nil
		}
		if errors.IsNotFound(err) {
			log.Info("Not persisting update to namespace that no longer exists", log.String("projectName", namespace.ObjectMeta.Namespace), log.String("namespaceName", namespace.ObjectMeta.Name), log.Err(err))
			return nil
		}
		if errors.IsConflict(err) {
			return fmt.Errorf("not persisting update to namespace '%s/%s' that has been changed since we received it: %v", namespace.ObjectMeta.Namespace, namespace.ObjectMeta.Name, err)
		}
		log.Warn(fmt.Sprintf("Failed to persist updated status of namespace '%s/%s/%s'", namespace.ObjectMeta.Namespace, namespace.ObjectMeta.Name, namespace.Status.Phase), log.String("namespaceName", namespace.ObjectMeta.Name), log.Err(err))
		time.Sleep(clientRetryInterval)
	}
	return err
}

4、Namespace AA接受到Update请求,进行预处理并检查有效性,最后对该Namespace进行ETCD Update操作,处理如下:

代码语言:javascript
复制
// NewStorage returns a Storage object that will work against namespace sets.
func NewStorage(optsGetter genericregistry.RESTOptionsGetter, businessClient *businessinternalclient.BusinessClient, platformClient platformversionedclient.PlatformV1Interface, privilegedUsername string) *Storage {
	strategy := namespace.NewStrategy(businessClient, platformClient)
	store := &registry.Store{
		NewFunc:                  func() runtime.Object { return &business.Namespace{} },
		NewListFunc:              func() runtime.Object { return &business.NamespaceList{} },
		DefaultQualifiedResource: business.Resource("namespaces"),
		PredicateFunc:            namespace.MatchNamespace,

		CreateStrategy: strategy,
		UpdateStrategy: strategy,
		DeleteStrategy: strategy,
		ExportStrategy: strategy,

		ShouldDeleteDuringUpdate: shouldDeleteDuringUpdate,
	}
	options := &genericregistry.StoreOptions{
		RESTOptions: optsGetter,
		AttrFunc:    namespace.GetAttrs,
	}

	if err := store.CompleteWithOptions(options); err != nil {
		log.Panic("Failed to create namespace etcd rest storage", log.Err(err))
	}

	statusStore := *store
	statusStore.UpdateStrategy = namespace.NewStatusStrategy(strategy)
	statusStore.ExportStrategy = namespace.NewStatusStrategy(strategy)

	finalizeStore := *store
	finalizeStore.UpdateStrategy = namespace.NewFinalizeStrategy(strategy)
	finalizeStore.ExportStrategy = namespace.NewFinalizeStrategy(strategy)

	return &Storage{
		Namespace: &REST{store, privilegedUsername},
		Status:    &StatusREST{&statusStore},
		Finalize:  &FinalizeREST{&finalizeStore},
	}
}
...
// PrepareForUpdate is invoked on update before validation to normalize
// the object.  For example: remove fields that are not to be persisted,
// sort order-insensitive list fields, etc.  This should not remove fields
// whose presence would be considered a validation error.
func (StatusStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
	newNamespace := obj.(*business.Namespace)
	oldNamespace := old.(*business.Namespace)
	newNamespace.Spec = oldNamespace.Spec
}

// ValidateUpdate is invoked after default fields in the object have been
// filled in before the object is persisted.  This method should not mutate
// the object.
func (s *StatusStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
	return ValidateNamespaceUpdate(obj.(*business.Namespace), old.(*business.Namespace),
		validation.NewObjectGetter(s.businessClient), validation.NewClusterGetter(s.platformClient))
}

// Update alters the status subset of an object.
func (r *StatusREST) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
	// We are explicitly setting forceAllowCreate to false in the call to the underlying storage because
	// subresources should never allow create on update.
	_, err := ValidateGetObjectAndTenantID(ctx, r.store, name, &metav1.GetOptions{})
	if err != nil {
		return nil, false, err
	}
	return r.store.Update(ctx, name, objInfo, createValidation, updateValidation, false, options)
}

5、在Update之后,Namespace Controller Watch到该事件,由于这个时候该Namespace对象的namespace.Status.Phase为NamespaceFailed或者NamespaceAvailable,所以会进入processUpdate处理:

代码语言:javascript
复制
func (c *Controller) handlePhase(key string, cachedNamespace *cachedNamespace, namespace *v1.Namespace) error {
	switch namespace.Status.Phase {
	case v1.NamespacePending:
		if err := c.calculateProjectUsed(cachedNamespace, namespace); err != nil {
			return err
		}
		if err := c.createNamespaceOnCluster(namespace); err != nil {
			namespace.Status.Phase = v1.NamespaceFailed
			namespace.Status.Message = "CreateNamespaceOnClusterFailed"
			namespace.Status.Reason = err.Error()
			namespace.Status.LastTransitionTime = metav1.Now()
			return c.persistUpdate(namespace)
		}
		namespace.Status.Phase = v1.NamespaceAvailable
		namespace.Status.Message = ""
		namespace.Status.Reason = ""
		namespace.Status.LastTransitionTime = metav1.Now()
		return c.persistUpdate(namespace)
	case v1.NamespaceAvailable, v1.NamespaceFailed:
		c.startNamespaceHealthCheck(key)
	}
	return nil
}
...
func (c *Controller) startNamespaceHealthCheck(key string) {
	if !c.health.Exist(key) {
		c.health.Set(key)
		go func() {
			if err := wait.PollImmediateUntil(1*time.Minute, c.watchNamespaceHealth(key), c.stopCh); err != nil {
				log.Error("Failed to wait poll immediate until", log.Err(err))
			}
		}()
		log.Info("Namespace phase start new health check", log.String("namespace", key))
	} else {
		log.Info("Namespace phase health check exit", log.String("namespace", key))
	}
}
...
// for PollImmediateUntil, when return true ,an err while exit
func (c *Controller) watchNamespaceHealth(key string) func() (bool, error) {
	return func() (bool, error) {
		log.Debug("Check namespace health", log.String("namespace", key))

		if !c.health.Exist(key) {
			return true, nil
		}

		projectName, namespaceName, err := cache.SplitMetaNamespaceKey(key)
		if err != nil {
			log.Error("Failed to split meta namespace key", log.String("key", key))
			c.health.Del(key)
			return true, nil
		}
		namespace, err := c.client.BusinessV1().Namespaces(projectName).Get(namespaceName, metav1.GetOptions{})
		if err != nil && errors.IsNotFound(err) {
			log.Error("Namespace not found, to exit the health check loop", log.String("projectName", projectName), log.String("namespaceName", namespaceName))
			c.health.Del(key)
			return true, nil
		}
		if err != nil {
			log.Error("Check namespace health, namespace get failed", log.String("projectName", projectName), log.String("namespaceName", namespaceName), log.Err(err))
			return false, nil
		}
		// if status is terminated,to exit the  health check loop
		if namespace.Status.Phase == v1.NamespaceTerminating || namespace.Status.Phase == v1.NamespacePending {
			log.Warn("Namespace status is terminated, to exit the health check loop", log.String("projectName", projectName), log.String("namespaceName", namespaceName))
			c.health.Del(key)
			return true, nil
		}

		if err := c.checkNamespaceHealth(namespace); err != nil {
			log.Error("Failed to check namespace health", log.String("projectName", projectName), log.String("namespaceName", namespaceName), log.Err(err))
		}
		return false, nil
	}
}
...
func (c *Controller) checkNamespaceHealth(namespace *v1.Namespace) error {
	// build client
	kubeClient, err := util.BuildExternalClientSetWithName(c.platformClient, namespace.Spec.ClusterName)
	if err != nil {
		return err
	}
	message, reason := checkNamespaceOnCluster(kubeClient, namespace)
	if message != "" {
		namespace.Status.Phase = v1.NamespaceFailed
		namespace.Status.LastTransitionTime = metav1.Now()
		namespace.Status.Message = message
		namespace.Status.Reason = reason
		return c.persistUpdate(namespace)
	}
	message, reason, used := calculateNamespaceUsed(kubeClient, namespace)
	if message != "" {
		namespace.Status.Phase = v1.NamespaceFailed
		namespace.Status.LastTransitionTime = metav1.Now()
		namespace.Status.Message = message
		namespace.Status.Reason = reason
		return c.persistUpdate(namespace)
	}
	if namespace.Status.Phase != v1.NamespaceAvailable || !reflect.DeepEqual(namespace.Status.Used, used) {
		if namespace.Status.Phase != v1.NamespaceAvailable {
			namespace.Status.LastTransitionTime = metav1.Now()
		}
		namespace.Status.Phase = v1.NamespaceAvailable
		namespace.Status.Used = used
		namespace.Status.Message = ""
		namespace.Status.Reason = ""
		return c.persistUpdate(namespace)
	}
	return nil
}

这里checkNamespaceHealth会检查Namespace对应的Kubernetes namespace是否存在,检查namespace ResourceQuota是否存在,并更新正确的namespace.Status.Used

之后会设置c.health cache,以便后续不会重复执行健康检查操作:

代码语言:javascript
复制
func (c *Controller) startNamespaceHealthCheck(key string) {
	if !c.health.Exist(key) {
		c.health.Set(key)
		go func() {
			if err := wait.PollImmediateUntil(1*time.Minute, c.watchNamespaceHealth(key), c.stopCh); err != nil {
				log.Error("Failed to wait poll immediate until", log.Err(err))
			}
		}()
		log.Info("Namespace phase start new health check", log.String("namespace", key))
	} else {
		log.Info("Namespace phase health check exit", log.String("namespace", key))
	}
}

之后如果用户删除该TKEStack Namespace,则进入Delete函数(github.com/tkestack/tke/pkg/business/registry/namespace/storage/storage.go:171):

代码语言:javascript
复制
// Delete enforces life-cycle rules for cluster termination
func (r *REST) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
	nsObj, err := ValidateGetObjectAndTenantID(ctx, r.Store, name, &metav1.GetOptions{})
	if err != nil {
		return nil, false, err
	}

	ns := nsObj.(*business.Namespace)

	// Ensure we have a UID precondition
	if options == nil {
		options = metav1.NewDeleteOptions(0)
	}
	if options.Preconditions == nil {
		options.Preconditions = &metav1.Preconditions{}
	}
	if options.Preconditions.UID == nil {
		options.Preconditions.UID = &ns.UID
	} else if *options.Preconditions.UID != ns.UID {
		err = errors.NewConflict(
			business.Resource("namespaces"),
			name,
			fmt.Errorf("precondition failed: UID in precondition: %v, UID in object meta: %v", *options.Preconditions.UID, ns.UID),
		)
		return nil, false, err
	}
	if options.Preconditions.ResourceVersion != nil && *options.Preconditions.ResourceVersion != ns.ResourceVersion {
		err = errors.NewConflict(
			business.Resource("namespaces"),
			name,
			fmt.Errorf("precondition failed: ResourceVersion in precondition: %v, ResourceVersion in object meta: %v", *options.Preconditions.ResourceVersion, ns.ResourceVersion),
		)
		return nil, false, err
	}

	if ns.DeletionTimestamp.IsZero() {
		key, err := r.Store.KeyFunc(ctx, name)
		if err != nil {
			return nil, false, err
		}

		preconditions := storage.Preconditions{UID: options.Preconditions.UID, ResourceVersion: options.Preconditions.ResourceVersion}

		out := r.Store.NewFunc()
		err = r.Store.Storage.GuaranteedUpdate(
			ctx, key, out, false, &preconditions,
			storage.SimpleUpdate(func(existing runtime.Object) (runtime.Object, error) {
				existingNamespace, ok := existing.(*business.Namespace)
				if !ok {
					// wrong type
					return nil, fmt.Errorf("expected *business.Namespace, got %v", existing)
				}
				if err := deleteValidation(ctx, existingNamespace); err != nil {
					return nil, err
				}
				// Set the deletion timestamp if needed
				if existingNamespace.DeletionTimestamp.IsZero() {
					now := metav1.Now()
					existingNamespace.DeletionTimestamp = &now
				}
				// Set the namespace phase to terminating, if needed
				if existingNamespace.Status.Phase != business.NamespaceTerminating {
					existingNamespace.Status.Phase = business.NamespaceTerminating
				}

				// the current finalizers which are on namespace
				currentFinalizers := map[string]bool{}
				for _, f := range existingNamespace.Finalizers {
					currentFinalizers[f] = true
				}
				// the finalizers we should ensure on namespace
				shouldHaveFinalizers := map[string]bool{
					metav1.FinalizerOrphanDependents: apiserverutil.ShouldHaveOrphanFinalizer(options, currentFinalizers[metav1.FinalizerOrphanDependents]),
					metav1.FinalizerDeleteDependents: apiserverutil.ShouldHaveDeleteDependentsFinalizer(options, currentFinalizers[metav1.FinalizerDeleteDependents]),
				}
				// determine whether there are changes
				changeNeeded := false
				for finalizer, shouldHave := range shouldHaveFinalizers {
					changeNeeded = currentFinalizers[finalizer] != shouldHave || changeNeeded
					if shouldHave {
						currentFinalizers[finalizer] = true
					} else {
						delete(currentFinalizers, finalizer)
					}
				}
				// make the changes if needed
				if changeNeeded {
					var newFinalizers []string
					for f := range currentFinalizers {
						newFinalizers = append(newFinalizers, f)
					}
					existingNamespace.Finalizers = newFinalizers
				}
				return existingNamespace, nil
			}),
			dryrun.IsDryRun(options.DryRun),
		)

		if err != nil {
			err = storageerr.InterpretGetError(err, business.Resource("namespaces"), name)
			err = storageerr.InterpretUpdateError(err, business.Resource("namespaces"), name)
			if _, ok := err.(*errors.StatusError); !ok {
				err = errors.NewInternalError(err)
			}
			return nil, false, err
		}

		return out, false, nil
	}

	// prior to final deletion, we must ensure that finalizers is empty
	if len(ns.Spec.Finalizers) != 0 {
		err = errors.NewConflict(business.Resource("namespaces"), ns.Name, fmt.Errorf("the system is ensuring all content is removed from this namespace.  Upon completion, this namespace will automatically be purged by the system"))
		return nil, false, err
	}
	return r.Store.Delete(ctx, name, deleteValidation, options)
}

这里会Update TKEStack Namespace.Status.Phase = business.NamespaceTerminating,TKEStack Namespace Conroller接受到该事件,进行处理如下:

代码语言:javascript
复制
// syncItem will sync the Namespace with the given key if it has had
// its expectations fulfilled, meaning it did not expect to see any more of its
// namespaces created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (c *Controller) syncItem(key string) error {
	startTime := time.Now()
	defer func() {
		log.Info("Finished syncing namespace", log.String("namespace", key), log.Duration("processTime", time.Since(startTime)))
	}()

	projectName, namespaceName, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}
	// Namespace holds the latest Namespace info from apiserver
	namespace, err := c.lister.Namespaces(projectName).Get(namespaceName)
	switch {
	case errors.IsNotFound(err):
		log.Info("Namespace has been deleted. Attempting to cleanup resources", log.String("projectName", projectName), log.String("namespaceName", namespaceName))
		err = c.processDeletion(key)
	case err != nil:
		log.Warn("Unable to retrieve namespace from store", log.String("projectName", projectName), log.String("namespaceName", namespaceName), log.Err(err))
	default:
		if namespace.Status.Phase == v1.NamespacePending || namespace.Status.Phase == v1.NamespaceAvailable || namespace.Status.Phase == v1.NamespaceFailed {
			cachedNamespace := c.cache.getOrCreate(key)
			err = c.processUpdate(cachedNamespace, namespace, key)
		} else if namespace.Status.Phase == v1.NamespaceTerminating {
			log.Info("Namespace has been terminated. Attempting to cleanup resources", log.String("projectName", projectName), log.String("namespaceName", namespaceName))
			_ = c.processDeletion(key)
			err = c.namespacedResourcesDeleter.Delete(projectName, namespaceName)
		} else {
			log.Debug(fmt.Sprintf("Namespace %s status is %s, not to process", key, namespace.Status.Phase))
		}
	}
	return err
}

首先删除cache数据:

代码语言:javascript
复制
func (c *Controller) processDeletion(key string) error {
	cachedNamespace, ok := c.cache.get(key)
	if !ok {
		log.Debug("Namespace not in cache even though the watcher thought it was. Ignoring the deletion", log.String("name", key))
		return nil
	}
	return c.processDelete(cachedNamespace, key)
}

func (c *Controller) processDelete(cachedNamespace *cachedNamespace, key string) error {
	log.Info("Namespace will be dropped", log.String("name", key))

	if c.cache.Exist(key) {
		log.Info("Delete the namespace cache", log.String("name", key))
		c.cache.delete(key)
	}

	if c.health.Exist(key) {
		log.Info("Delete the namespace health cache", log.String("name", key))
		c.health.Del(key)
	}

	return nil
}

接着删除TKEStack Namespace对应资源,如下:

代码语言:javascript
复制
// Delete deletes all resources in the given namespace.
// Before deleting resources:
// * It ensures that deletion timestamp is set on the
//   namespace (does nothing if deletion timestamp is missing).
// * Verifies that the namespace is in the "terminating" phase
//   (updates the namespace phase if it is not yet marked terminating)
// After deleting the resources:
// * It removes finalizer token from the given namespace.
// * Deletes the namespace if deleteNamespaceWhenDone is true.
//
// Returns an error if any of those steps fail.
// Returns ResourcesRemainingError if it deleted some resources but needs
// to wait for them to go away.
// Caller is expected to keep calling this until it succeeds.
func (d *namespacedResourcesDeleter) Delete(projectName string, namespaceName string) error {
	// Multiple controllers may edit a namespace during termination
	// first get the latest state of the namespace before proceeding
	// if the namespace was deleted already, don't do anything
	namespace, err := d.businessClient.Namespaces(projectName).Get(namespaceName, metav1.GetOptions{})
	if err != nil {
		if errors.IsNotFound(err) {
			return nil
		}
		return err
	}
	if namespace.DeletionTimestamp == nil {
		return nil
	}

	log.Infof("namespace controller - syncNamespace - namespace: %s, finalizerToken: %s", namespace.Name, d.finalizerToken)

	// ensure that the status is up to date on the namespace
	// if we get a not found error, we assume the namespace is truly gone
	namespace, err = d.retryOnConflictError(namespace, d.updateNamespaceStatusFunc)
	if err != nil {
		if errors.IsNotFound(err) {
			return nil
		}
		return err
	}

	// the latest view of the namespace asserts that namespace is no longer deleting..
	if namespace.DeletionTimestamp.IsZero() {
		return nil
	}

	// Delete the namespace if it is already finalized.
	if d.deleteNamespaceWhenDone && finalized(namespace) {
		return d.deleteNamespace(namespace)
	}

	// there may still be content for us to remove
	err = d.deleteAllContent(namespace)
	if err != nil {
		return err
	}

	// we have removed content, so mark it finalized by us
	namespace, err = d.retryOnConflictError(namespace, d.finalizeNamespace)
	if err != nil {
		// in normal practice, this should not be possible, but if a deployment is running
		// two controllers to do namespace deletion that share a common finalizer token it's
		// possible that a not found could occur since the other controller would have finished the delete.
		if errors.IsNotFound(err) {
			return nil
		}
		return err
	}

	// Check if we can delete now.
	if d.deleteNamespaceWhenDone && finalized(namespace) {
		return d.deleteNamespace(namespace)
	}
	return nil
}

删除TKEStack Namespace:

代码语言:javascript
复制
...
// Deletes the given namespace.
func (d *namespacedResourcesDeleter) deleteNamespace(namespace *v1.Namespace) error {
	var opts *metav1.DeleteOptions
	uid := namespace.UID
	if len(uid) > 0 {
		opts = &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &uid}}
	}
	err := d.businessClient.Namespaces(namespace.ObjectMeta.Namespace).Delete(namespace.ObjectMeta.Name, opts)
	if err != nil && !errors.IsNotFound(err) {
		return err
	}
	return nil
}

重新调整对应Project的资源配额:

代码语言:javascript
复制
// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
// It returns an estimate of the time remaining before the remaining resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func (d *namespacedResourcesDeleter) deleteAllContent(namespace *v1.Namespace) error {
	log.Debug("Namespace controller - deleteAllContent", log.String("namespaceName", namespace.ObjectMeta.Name))

	var errs []error
	for _, deleteFunc := range deleteResourceFuncs {
		err := deleteFunc(d, namespace)
		if err != nil {
			// If there is an error, hold on to it but proceed with all the remaining resource.
			errs = append(errs, err)
		}
	}

	if len(errs) > 0 {
		return utilerrors.NewAggregate(errs)
	}

	log.Debug("Namespace controller - deletedAllContent", log.String("namespaceName", namespace.ObjectMeta.Name))
	return nil
}

var deleteResourceFuncs = []deleteResourceFunc{
	recalculateProjectUsed,
	deleteNamespaceOnCluster,
}

func recalculateProjectUsed(deleter *namespacedResourcesDeleter, namespace *v1.Namespace) error {
	log.Debug("Namespace controller - recalculateProjectUsed", log.String("namespaceName", namespace.ObjectMeta.Name))

	project, err := deleter.businessClient.Projects().Get(namespace.ObjectMeta.Namespace, metav1.GetOptions{})
	if err != nil {
		log.Error("Failed to get the project", log.String("namespaceName", namespace.ObjectMeta.Name), log.String("projectName", namespace.ObjectMeta.Namespace), log.Err(err))
		return err
	}
	calculatedNamespaceNames := sets.NewString(project.Status.CalculatedNamespaces...)
	if calculatedNamespaceNames.Has(namespace.ObjectMeta.Name) {
		calculatedNamespaceNames.Delete(namespace.ObjectMeta.Name)
		project.Status.CalculatedNamespaces = calculatedNamespaceNames.List()
		if project.Status.Clusters != nil {
			clusterUsed, clusterUsedExist := project.Status.Clusters[namespace.Spec.ClusterName]
			if clusterUsedExist {
				for k, v := range namespace.Spec.Hard {
					usedValue, ok := clusterUsed.Used[k]
					if ok {
						usedValue.Sub(v)
						clusterUsed.Used[k] = usedValue
					}
				}
				project.Status.Clusters[namespace.Spec.ClusterName] = clusterUsed
			}
		}
		_, err := deleter.businessClient.Projects().Update(project)
		if err != nil {
			log.Error("Failed to update the project status", log.String("namespaceName", namespace.ObjectMeta.Name), log.String("projectName", namespace.ObjectMeta.Namespace), log.Err(err))
			return err
		}
	}

	return nil
}

最后删除TKEStack Namespace对应的Kubernetes namespace:

代码语言:javascript
复制
func deleteNamespaceOnCluster(deleter *namespacedResourcesDeleter, namespace *v1.Namespace) error {
	kubeClient, err := platformutil.BuildExternalClientSetWithName(deleter.platformClient, namespace.Spec.ClusterName)
	if err != nil {
		log.Error("Failed to create the kubernetes client", log.String("namespaceName", namespace.ObjectMeta.Name), log.String("clusterName", namespace.Spec.ClusterName), log.Err(err))
		return err
	}
	ns, err := kubeClient.CoreV1().Namespaces().Get(namespace.Spec.Namespace, metav1.GetOptions{})
	if err != nil && errors.IsNotFound(err) {
		return nil
	}
	projectName, ok := ns.ObjectMeta.Labels[util.LabelProjectName]
	if !ok {
		return fmt.Errorf("no project label were found on the namespace within the business cluster")
	}
	if projectName != namespace.ObjectMeta.Namespace {
		return fmt.Errorf("the namespace in the business cluster currently belongs to another project")
	}
	background := metav1.DeletePropagationBackground
	deleteOpt := &metav1.DeleteOptions{PropagationPolicy: &background}
	if err := kubeClient.CoreV1().Namespaces().Delete(namespace.Spec.Namespace, deleteOpt); err != nil {
		log.Error("Failed to delete the namespace in the business cluster", log.String("clusterName", namespace.Spec.ClusterName), log.String("namespace", namespace.Spec.Namespace), log.Err(err))
		return err
	}
	return nil
}

通过上述设计,就可以将TKEStack Project的资源配额分配到TKEStack Namespace,而TKEStack Namespace对应Kubernetes的namespace,namespace通过quota-memory-cpu-namespace进行实际的集群资源控制。

总结

上述对TKEStack business模块的Namespace CR进行了分析,其它模块原理类似,都是采用Kubernetes标准的Aggregated APIServer和Controller设计,代码精炼且优雅,不失为Kubernetes AA的最佳实践……

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 腾讯云原生 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • TKEStack Global Cluster
  • Custom Resource
  • Controller
  • Aggregated APIServer
  • Operator
  • TKEStack business Aggregated APIServer
  • 总结
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档