apiserver在master节点上对外提供kubernetes restful api服务,提供的主要是与集群管理相关的API服务;
用户请求过来后,apiserver服务对请求做获取请求内容、请求内容检查、认证、audit、授权、修改式准入控制、路由、验证式准入控制、资源的格式转换、持久化存储到etcd等功能
api-server就调用CreateServerChain
来创建了一系列的服务,包括:
初始化KubeAPIServer的流程主要有:
http filter chain 的配置、API Group 的注册、http path 与 handler 的关联以及 handler 后端存储 etcd 的配置。
1.CreateKubeAPIServerConfig()为KubeAPIServer生成相关配置及资源master.Config,
2.master.Config.Complete()指定一系列服务参数,即配置文件中的参数map到结构体中,返回CompletedConfig对象
2.1、调用 genericapiserver.NewConfig
生成默认的 genericConfig,genericConfig 中主要配置了 DefaultBuildHandlerChain
,DefaultBuildHandlerChain
中包含了认证、鉴权等一系列 http filter chain;
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,
) (......) {
...
// 1、创建认证实例,支持多种认证方式:请求 Header 认证、Auth 文件认证、CA 证书认证、Bearer token 认证、
// ServiceAccount 认证、BootstrapToken 认证、WebhookToken 认证等
genericConfig.Authentication.Authenticator, genericConfig.OpenAPIConfig.SecurityDefinitions, err = BuildAuthenticator(s, clientgoExternalClient, versionedInformers)
if err != nil {
lastErr = fmt.Errorf("invalid authentication config: %v", err)
return
}
// 2、创建鉴权实例,包含:Node、RBAC、Webhook、ABAC、AlwaysAllow、AlwaysDeny
genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, versionedInformers)
......
serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
authInfoResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, genericConfig.LoopbackClientConfig)
// 3、审计插件的初始化
lastErr = s.Audit.ApplyTo(......)
if lastErr != nil {
return
}
// 4、准入插件的初始化
pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, serviceResolver)
if err != nil {
lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
return
}
err = s.Admission.ApplyTo(......)
if err != nil {
lastErr = fmt.Errorf("failed to initialize admission: %v", err)
}
2.2 初始化默认的http filter chain,整理filters过滤处理函数:
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
if c.FlowControl != nil {
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl)
} else {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericfilters.WithPanicRecovery(handler)
return handler
}
2.3 http filter chain保存在 APIServerHandler的 FullHandlerChain
type APIServerHandler struct {
// FullHandlerChain is the one that is eventually served with. It should include the full filter
// chain and then call the Director.
FullHandlerChain http.Handler
…
}
2.4 kube-apiserver启动http server
// ServeHTTP makes it an http.Handler
func (a *APIServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
a.FullHandlerChain.ServeHTTP(w, r)
}
3.CompletedConfig为kube apiserver准备restStorageProviders和API接口(InstallAPIs())
4. InstallAPIs()遍历每个apigroups,并通过s.Handler.GoRestfulContainer.Add()注册各个groups里面的rest接口,即将资源rest接口安装到go-restful的router中(kube apiserver使用的是go-restful框架提供api服务)其主要有以下三种 API:core group:主要在 /api/v1
下;named groups:其 path 为 /apis/$NAME/$VERSION
;暴露系统状态的一些 API:如/metrics
、/healthz
等;
4.1例如RESTStorageProvider的v1版本groups下的资源,
func (p RESTStorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
storage := map[string]rest.Storage{}
// deployments
deploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter)
if err != nil {
return storage, err
}
storage["deployments"] = deploymentStorage.Deployment
…
}
4.2为各个资源增加handler
func (a *APIServerHandler) ListedPaths() []string {
var handledPaths []string
// Extract the paths handled using restful.WebService
for _, ws := range a.GoRestfulContainer.RegisteredWebServices() {
handledPaths = append(handledPaths, ws.RootPath())
}
handledPaths = append(handledPaths, a.NonGoRestfulMux.ListedPaths()...)
sort.Strings(handledPaths)
return handledPaths
}
实现:func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
4.3 handler的处理策略
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &apps.Deployment{} },
NewListFunc: func() runtime.Object { return &apps.DeploymentList{} },
DefaultQualifiedResource: apps.Resource("deployments"),
CreateStrategy: deployment.Strategy,
UpdateStrategy: deployment.Strategy,
DeleteStrategy: deployment.Strategy,
TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
}
4.5例如,给post某个resource注册handler:
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
article := GetArticleForNoun(kind, " ")
doc := "create" + article + kind
if isSubresource {
doc = "create " + subresource + " of" + article + kind
}
//标准的go-restful注册router方式
route := ws.POST(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
// TODO: in some cases, the API may return a v1.Status instead of the versioned object
// but currently go-restful can't handle multiple different objects being returned.
Returns(http.StatusCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
Reads(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
return nil, err
}
addParams(route, action.Params)
routes = append(routes, route)
5.restStorageProviders用于创建RESTstorage,不同类型的资源对应不同的storage,
继续看一下etcd storage是如何创建进来的
5.1生成kube apiserver的 genericoptions时会调用GetRestOpetions()方法获取etcd storage
genericConfig.RESTOptionsGetter = &genericoptions.SimpleRestOptionsFactory{Options: etcdOptions}
func (f *SimpleRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
ret := generic.RESTOptions{
StorageConfig: &f.Options.StorageConfig,
Decorator: generic.UndecoratedStorage,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
ResourcePrefix: resource.Group + "/" + resource.Resource,
CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,
}
if f.Options.EnableWatchCache { //使用watchCache
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
// depending on cache size this might return an undecorated storage
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}
return ret, nil
}
5.2 引入etcd3storage
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
1.当请求到达 kube-apiserver 时,kube-apiserver 首先会执行在 http filter chain 中注册的过滤器链,该过滤器对其执行一系列过滤操作,主要有认证、鉴权等检查操作。 2.当 filter chain 处理完成后,go-restful框架根据请求的方法路由到相应的handler,回看一下handler的注册
case "POST": // Create a resource.
var handler restful.RouteFunction
if isNamedCreater {
handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
} else {
handler = restfulCreateResource(creater, reqScope, admit)
}
handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, handler)
article := GetArticleForNoun(kind, " ")
doc := "create" + article + kind
if isSubresource {
doc = "create " + subresource + " of" + article + kind
}
route := ws.POST(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
Returns(http.StatusOK, "OK", producedObject).
// TODO: in some cases, the API may return a v1.Status instead of the versioned object
// but currently go-restful can't handle multiple different objects being returned.
Returns(http.StatusCreated, "Created", producedObject).
Returns(http.StatusAccepted, "Accepted", producedObject).
Reads(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
return nil, err
}
addParams(route, action.Params)
routes = append(routes, route)
3.kube apiserver通过restfulCreateNamedResource()创建handler:
// CreateNamedResource returns a function that will handle a resource creation with name.
func CreateNamedResource(r rest.NamedCreater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
return createHandler(r, scope, admission, true)
}
4.createHandler()处理代码很多,简述几个重要点
5.Decoder 会首先把 creater object 转换到 internal version,然后将其转换为 storage version,storage version 是在 etcd 中存储时的另一个 version。
6.准入控制从这里开始调用的,kube-apiserver 在启动时通过设置 --enable-admission-plugins 参数来开启需要使用的插件,通过 ValidatingAdmissionWebhook 或 MutatingAdmissionWebhook 添加的插件也都会在此处进行工作。
可以看到MutatingAdmissionWebhook都会在ValidatingAdmissionWebhook之前调用
admissionAttributes := admission.NewAttributesRecord(obj, nil, scope.Kind, namespace, name, scope.Resource, scope.Subresource, admission.Create, options, dryrun.IsDryRun(options.DryRun), userInfo)
if mutatingAdmission, ok := admit.(admission.MutationInterface); ok && mutatingAdmission.Handles(admission.Create) {
if err := mutatingAdmission.Admit(ctx, admissionAttributes, scope); err != nil {
return nil, err
}
}
6.1 回过头看看admission controller的初始化及调用,
func NewAdmissionOptions() *AdmissionOptions {
options := genericoptions.NewAdmissionOptions()
// register all admission plugins
RegisterAllAdmissionPlugins(options.Plugins)
func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
admit.Register(plugins) // DEPRECATED as no real meaning
alwayspullimages.Register(plugins)
antiaffinity.Register(plugins)
defaulttolerationseconds.Register(plugins)
…
}
6.2 admission plugin注册时具有顺序性
// enabledPluginNames makes use of RecommendedPluginOrder, DefaultOffPlugins,
// EnablePlugins, DisablePlugins fields
// to prepare a list of ordered plugin names that are enabled.
func (a *AdmissionOptions) enabledPluginNames() []string {
allOffPlugins := append(a.DefaultOffPlugins.List(), a.DisablePlugins...)
disabledPlugins := sets.NewString(allOffPlugins...)
enabledPlugins := sets.NewString(a.EnablePlugins...)
disabledPlugins = disabledPlugins.Difference(enabledPlugins)
orderedPlugins := []string{}
for _, plugin := range a.RecommendedPluginOrder {
if !disabledPlugins.Has(plugin) {
orderedPlugins = append(orderedPlugins, plugin)
}
}
return orderedPlugins
}
注册到kube apiserver
admissionChain, err := a.Plugins.NewFromPlugins(pluginNames, pluginsConfigProvider, initializersChain, a.Decorators)
if err != nil {
return err
}
c.AdmissionControl = admissionmetrics.WithStepMetrics(admissionChain)
7.Validation主要检查 object 中字段的合法性
kube-apiserver 共由 3 个组件构成(Aggregator、KubeAPIServer、APIExtensionServer),这些组件依次通过 Delegation 处理请求:
Aggregator:暴露的功能类似于一个七层负载均衡,将来自用户的请求拦截转发给其他服务器,并且负责整个 APIServer 的 Discovery 功能;
KubeAPIServer :负责对请求的一些通用处理,认证、鉴权等,以及处理各个内建资源的 REST 服务;
APIExtensionServer:主要处理 CustomResourceDefinition(CRD)和 CustomResource(CR)的 REST 请求,也是 Delegation 的最后一环,如果对应 CR 不能被处理的话则会返回 404。
Aggregator 和 APIExtensionsServer 对应两种主要扩展 APIServer 资源的方式,即分别是 AA 和 CRD。
AggregatorAggregator 通过 APIServices 对象关联到某个 Service 来进行请求的转发,其关联的 Service 类型进一步决定了请求转发形式。Aggregator 包括一个 GenericAPIServer
和维护自身状态的 Controller。
其中 GenericAPIServer
主要处理 apiregistration.k8s.io
组下的 APIService 资源请求。Aggregator 除了处理资源请求外还包含几个 controller:
1、apiserviceRegistrationController
:负责 APIServices 中资源的注册与删除;
2、availableConditionController
:维护 APIServices 的可用状态,包括其引用 Service 是否可用等;
3、autoRegistrationController
:用于保持 API 中存在的一组特定的 APIServices;
4、crdRegistrationController
:负责将 CRD GroupVersions 自动注册到 APIServices 中;
5、openAPIAggregationController
:将 APIServices 资源的变化同步至提供的 OpenAPI 文档;
kubernetes 中的一些附加组件,比如 metrics-server 就是通过 Aggregator 的方式进行扩展的,实际环境中可以通过使用 apiserver-builder 工具轻松以 Aggregator 的扩展方式创建自定义资源。
在 kube-apiserver 中需要增加以下配置来开启 API Aggregation:
--proxy-client-cert-file=/etc/kubernetes/certs/proxy.crt
--proxy-client-key-file=/etc/kubernetes/certs/proxy.key
--requestheader-client-ca-file=/etc/kubernetes/certs/proxy-ca.crt
--requestheader-allowed-names=aggregator
--requestheader-extra-headers-prefix=X-Remote-Extra-
--requestheader-group-headers=X-Remote-Group
--requestheader-username-headers=X-Remote-User
KubeAPIServer 主要是提供对 API Resource 的操作请求,为 kubernetes 中众多 API 注册路由信息,暴露 RESTful API 并且对外提供 kubernetes service,使集群中以及集群外的服务都可以通过 RESTful API 操作 kubernetes 中的资源。
APIExtensionServer主要处理 CustomResourceDefinition(CRD)和 CustomResource(CR)的 REST 请求,也是 Delegation 的最后一环,如果对应 CR 不能被处理的话则会返回 404。
APIExtensionServer 作为 Delegation 链的最后一层,是处理所有用户通过 Custom Resource Definition 定义的资源服务器。
其中包含的 controller 以及功能如下所示:
1、openapiController:将 crd 资源的变化同步至提供的 OpenAPI 文档,可通过访问 /openapi/v2 进行查看; 2、crdController:负责将 crd 信息注册到 apiVersions 和 apiResources 中,两者的信息可通过
$ kubectl api-versions 和 $ kubectl api-resources 查看; 3、namingController:检查 crd obj 中是否有命名冲突,可在 crd .status.conditions 中查看; 4、establishingController:检查 crd 是否处于正常状态,可在 crd .status.conditions 中查看; 5、nonStructuralSchemaController:检查 crd obj 结构是否正常,可在 crd .status.conditions 中查看; 6、apiApprovalController:检查 crd 是否遵循 kubernetes API 声明策略,可在 crd .status.conditions 中查看; 7、finalizingController:类似于finalizes 的功能,与CRs 的删除有关;
APIExtensionServer为CR注册route的工作由来crdController处理crdController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)crdcontroller通过informer监听crd资源,当crd资源创建或者删除,crdcontroller做相应的注册和取消route操作
c.groupHandler.setDiscovery(version.Group, discovery.NewAPIGroupHandler(Codecs, apiGroup))
if !foundVersion {
c.versionHandler.unsetDiscovery(version)
return nil
}
c.versionHandler.setDiscovery(version, discovery.NewAPIVersionHandler(Codecs, version, discovery.APIResourceListerFunc(func() []metav1.APIResource {
return apiResourcesForDiscovery
})))
1.kubectl使用kubectl命令,跟普通k8s资源一样使用CRD资源,如:
kubectl api-versionskubectl api-resources
2.client 使用例子:
// NewCRDClient is used to create a restClient for crd
func NewCRDClient(cfg *rest.Config) (*rest.RESTClient, error) {
scheme := runtime.NewScheme()
schemeBuilder := runtime.NewSchemeBuilder(addDeviceCrds)
err := schemeBuilder.AddToScheme(scheme)
if err != nil {
return nil, err
}
config := *cfg
config.APIPath = "/apis"
config.GroupVersion = &v1alpha1.SchemeGroupVersion
config.ContentType = runtime.ContentTypeJSON
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: serializer.NewCodecFactory(scheme)}
client, err := rest.RESTClientFor(&config)
if err != nil {
log.Fatalf("Failed to create REST Client due to error %v", err)
return nil, err
}
return client, nil
}
func addDeviceCrds(scheme *runtime.Scheme) error {
// Add Device
scheme.AddKnownTypes(v1alpha1.SchemeGroupVersion, &v1alpha1.Device{}, &v1alpha1.DeviceList{})
v1.AddToGroupVersion(scheme, v1alpha1.SchemeGroupVersion)
// Add DeviceModel
scheme.AddKnownTypes(v1alpha1.SchemeGroupVersion, &v1alpha1.DeviceModel{}, &v1alpha1.DeviceModelList{})
v1.AddToGroupVersion(scheme, v1alpha1.SchemeGroupVersion)
return nil
}
参考:
https://cloud.tencent.com/developer/article/1591764
https://kubernetes.io/docs/tasks/access-kubernetes-api/setup-extension-api-server/
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。