这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
kubectl get pods -n kube-system
NAME READY STATUS RESTARTS AGE
coredns-78fcd69978-jztff 1/1 Running 6 (35d ago) 125d
coredns-78fcd69978-ts7gq 1/1 Running 6 (35d ago) 125d
etcd-hedy 1/1 Running 6 (35d ago) 125d
kube-apiserver-hedy 1/1 Running 7 (35d ago) 125d
kube-controller-manager-hedy 1/1 Running 11 (30h ago) 125d
kube-proxy-2qx6k 1/1 Running 6 125d
kube-scheduler-hedy 1/1 Running 11 (30h ago) 125d
kubectl describe pod kube-controller-manager-hedy -n kube-system
Command:
kube-controller-manager
--allocate-node-cidrs=true
--authentication-kubeconfig=/etc/kubernetes/controller-manager.conf
--authorization-kubeconfig=/etc/kubernetes/controller-manager.conf
--bind-address=0.0.0.0
--client-ca-file=/etc/kubernetes/pki/ca.crt
--cluster-cidr=100.64.0.0/10
--cluster-name=kubernetes
--cluster-signing-cert-file=/etc/kubernetes/pki/ca.crt
--cluster-signing-key-file=/etc/kubernetes/pki/ca.key
--controllers=*,bootstrapsigner,tokencleaner
--experimental-cluster-signing-duration=876000h
--feature-gates=TTLAfterFinished=true,EphemeralContainers=true
--kubeconfig=/etc/kubernetes/controller-manager.conf
--leader-elect=true
--port=0
--requestheader-client-ca-file=/etc/kubernetes/pki/front-proxy-ca.crt
--root-ca-file=/etc/kubernetes/pki/ca.crt
--service-account-private-key-file=/etc/kubernetes/pki/sa.key
--service-cluster-ip-range=10.96.0.0/22
--use-service-account-credentials=true
func main() {
command := app.NewControllerManagerCommand()
code := cli.Run(command)
os.Exit(code)
}
s, err := options.NewKubeControllerManagerOptions()
// 准备一个flag集合,注意,flag是cobra中的概念
fs := cmd.Flags()
// 每个controller都有一个flag集合(里面是多个flag),所以多个controller就有多个flag集合,全部存放在namedFlagSets中,通过controller的name来存取
namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List())
verflag.AddFlags(namedFlagSets.FlagSet("global"))
globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
registerLegacyGlobalFlags(namedFlagSets)
for _, f := range namedFlagSets.FlagSets {
fs.AddFlagSet(f)
}
RunE: func(cmd *cobra.Command, args []string) error {
// 如果启动命令传入了"--version",就打印版本信息然后退出进程
verflag.PrintAndExitIfRequested()
// 验证日志服务的设置并使之生效(如格式、文件目录等)
if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
return err
}
// 打印所有flag的名字和值(这里的flag是cobra中的概念)
cliflag.PrintFlags(cmd.Flags())
// 配置初始化:注册controller,校验配置,生成配置对象,生成客户端对象clientSet
// 这里有些重要的逻辑,稍后会详细说明
c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
if err != nil {
return err
}
// 监控指标配置
utilfeature.DefaultMutableFeatureGate.AddMetrics()
// controller-manager的业务逻辑启动
return Run(context.Background(), c.Complete())
}
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
controllers := map[string]InitFunc{}
// All of the controllers must have unique names, or else we will explode.
register := func(name string, fn InitFunc) {
if _, found := controllers[name]; found {
panic(fmt.Sprintf("controller name %q was registered twice", name))
}
controllers[name] = fn
}
register("endpoint", startEndpointController)
register("endpointslice", startEndpointSliceController)
register("endpointslicemirroring", startEndpointSliceMirroringController)
register("replicationcontroller", startReplicationController)
register("podgc", startPodGCController)
register("resourcequota", startResourceQuotaController)
register("namespace", startNamespaceController)
register("serviceaccount", startServiceAccountController)
register("garbagecollector", startGarbageCollectorController)
register("daemonset", startDaemonSetController)
register("job", startJobController)
register("deployment", startDeploymentController)
register("replicaset", startReplicaSetController)
register("horizontalpodautoscaling", startHPAController)
register("disruption", startDisruptionController)
register("statefulset", startStatefulSetController)
register("cronjob", startCronJobController)
register("csrsigning", startCSRSigningController)
register("csrapproving", startCSRApprovingController)
register("csrcleaner", startCSRCleanerController)
register("ttl", startTTLController)
register("bootstrapsigner", startBootstrapSignerController)
register("tokencleaner", startTokenCleanerController)
register("nodeipam", startNodeIpamController)
register("nodelifecycle", startNodeLifecycleController)
if loopMode == IncludeCloudLoops {
register("service", startServiceController)
register("route", startRouteController)
register("cloud-node-lifecycle", startCloudNodeLifecycleController)
// TODO: volume controller into the IncludeCloudLoops only set.
}
register("persistentvolume-binder", startPersistentVolumeBinderController)
register("attachdetach", startAttachDetachController)
register("persistentvolume-expander", startVolumeExpandController)
register("clusterrole-aggregation", startClusterRoleAggregrationController)
register("pvc-protection", startPVCProtectionController)
register("pv-protection", startPVProtectionController)
register("ttl-after-finished", startTTLAfterFinishedController)
register("root-ca-cert-publisher", startRootCACertPublisher)
register("ephemeral-volume", startEphemeralVolumeController)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) &&
utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
register("storage-version-gc", startStorageVersionGCController)
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
register("resource-claim-controller", startResourceClaimController)
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LegacyServiceAccountTokenCleanUp) {
register("legacy-service-account-token-cleaner", startLegacySATokenCleaner)
}
return controllers
}
func (s KubeControllerManagerOptions) Config(allControllers []string, disabledByDefaultControllers []string) (*kubecontrollerconfig.Config, error) {
// 对每个controller的配置进行校验
if err := s.Validate(allControllers, disabledByDefaultControllers); err != nil {
return nil, err
}
// 如果有必要就创建自签证书
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts("localhost", nil, []net.IP{netutils.ParseIPSloppy("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}
// 为创建client-go的客户端对象做准备:创建restclient.Config
kubeconfig, err := clientcmd.BuildConfigFromFlags(s.Master, s.Generic.ClientConnection.Kubeconfig)
if err != nil {
return nil, err
}
kubeconfig.DisableCompression = true
kubeconfig.ContentConfig.AcceptContentTypes = s.Generic.ClientConnection.AcceptContentTypes
kubeconfig.ContentConfig.ContentType = s.Generic.ClientConnection.ContentType
kubeconfig.QPS = s.Generic.ClientConnection.QPS
kubeconfig.Burst = int(s.Generic.ClientConnection.Burst)
// 创建cliet-go库的客户端对象,有了它,就能对kubernetes的资源进行读写和监听了,非常重要
client, err := clientset.NewForConfig(restclient.AddUserAgent(kubeconfig, KubeControllerManagerUserAgent))
if err != nil {
return nil, err
}
// 事件广播对象
eventBroadcaster := record.NewBroadcaster()
eventRecorder := eventBroadcaster.NewRecorder(clientgokubescheme.Scheme, v1.EventSource{Component: KubeControllerManagerUserAgent})
// 创建配置对象
c := &kubecontrollerconfig.Config{
Client: client,
Kubeconfig: kubeconfig,
EventBroadcaster: eventBroadcaster,
EventRecorder: eventRecorder,
}
// 更新配置对象的信息(s的配置设置到c)
if err := s.ApplyTo(c); err != nil {
return nil, err
}
s.Metrics.Apply()
return c, nil
}
// 如果无需选主(例如固定一个实例),这里直接调用run启动controller了,并提前返回
// 不过从启动命令的参数中有leader-elect=true,表示需要选主
if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
run(ctx, saTokenControllerInitFunc, NewControllerInitializers)
return nil
}
// 如果涉及到选主,也就是几个controller-manager进程,只有一个启动controller,就要准备一个独一无二的身份,这里是主机名+UUID
id, err := os.Hostname()
if err != nil {
return err
}
id = id + "_" + string(uuid.NewUUID())
go leaderElectAndRun(ctx, c, id, electionChecker,
c.ComponentConfig.Generic.LeaderElection.ResourceLock,
c.ComponentConfig.Generic.LeaderElection.ResourceName,
leaderelection.LeaderCallbacks{
// OnStartedLeading会在选主成功后执行
OnStartedLeading: func(ctx context.Context) {
// NewControllerInitializers在前面分析过,会生成一个map,
// key是controller名,value是controller的初始化方法
initializersFunc := NewControllerInitializers
if leaderMigrator != nil {
// If leader migration is enabled, we should start only non-migrated controllers
// for the main lock.
initializersFunc = createInitializersFunc(leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
logger.Info("leader migration: starting main controllers.")
}
run(ctx, startSATokenController, initializersFunc)
},
// OnStoppedLeading会在失去leader身份时执行,klog.FlushAndExit内部会结束进程
OnStoppedLeading: func() {
logger.Error(nil, "leaderelection lost")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
},
})
run := func(ctx context.Context, startSATokenController InitFunc, initializersFunc ControllerInitializersFunc) {
// 为启动controller准备context,里面存了多中公共对象,给各个controller用
controllerContext, err := CreateControllerContext(logger, c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
logger.Error(err, "Error building controller context")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
// initializersFunc即NewControllerInitializers方法,
// 可以返回所有controller及其初始化方法
controllerInitializers := initializersFunc(controllerContext.LoopMode)
// StartControllers中会遍历controllerInitializers的返回值,对每个controller执行初始化和启动
if err := StartControllers(ctx, controllerContext, startSATokenController, controllerInitializers, unsecuredMux, healthzHandler); err != nil {
logger.Error(err, "Error starting controllers")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
// 启动所有informer
controllerContext.InformerFactory.Start(stopCh)
controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
close(controllerContext.InformersStarted)
<-ctx.Done()
}
func CreateControllerContext(logger klog.Logger, s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
metadataClient := metadata.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("metadata-informers"))
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, ResyncPeriod(s)())
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
}
// Use a discovery client capable of being refreshed.
discoveryClient := rootClientBuilder.DiscoveryClientOrDie("controller-discovery")
cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
go wait.Until(func() {
restMapper.Reset()
}, 30*time.Second, stop)
availableResources, err := GetAvailableResources(rootClientBuilder)
if err != nil {
return ControllerContext{}, err
}
cloud, loopMode, err := createCloudProvider(logger, s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin,
s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
if err != nil {
return ControllerContext{}, err
}
ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
ObjectOrMetadataInformerFactory: informerfactory.NewInformerFactory(sharedInformers, metadataInformers),
ComponentConfig: s.ComponentConfig,
RESTMapper: restMapper,
AvailableResources: availableResources,
Cloud: cloud,
LoopMode: loopMode,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
ControllerManagerMetrics: controllersmetrics.NewControllerManagerMetrics("kube-controller-manager"),
}
controllersmetrics.Register()
return ctx, nil
}
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
}
// 第二个入参wait,返回值是个channel,wait方法里面有定时器,每一秒向返回的channel写数据,超时了就关闭
func waitForWithContext(ctx context.Context, wait waitWithContextFunc, fn ConditionWithContextFunc) error {
waitCtx, cancel := context.WithCancel(context.Background())
defer cancel()
c := wait(waitCtx)
for {
select {
// 由于c在wait方法中每秒被写一次,所以下面这个case每秒执行一次
case _, open := <-c:
// 这里的fn在外面传入的是远程请求api-server
ok, err := runConditionWithCrashProtectionWithContext(ctx, fn)
if err != nil {
return err
}
// 这表示api-server能正常响应,也就是在超时时间内拿到了想要的结果
if ok {
return nil
}
// 如果c被关闭,就证明已经超时了
if !open {
return ErrWaitTimeout
}
case <-ctx.Done():
// returning ctx.Err() will break backward compatibility, use new PollUntilContext*
// methods instead
return ErrWaitTimeout
}
}
}
// 遍历初始化方法集合
for controllerName, initFn := range controllers {
if !controllerCtx.IsControllerEnabled(controllerName) {
logger.Info("Warning: controller is disabled", "controller", controllerName)
continue
}
time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
logger.V(1).Info("Starting controller", "controller", controllerName)
// 执行initFn,就完成了controller的初始化和启动,initFn具体是什么呢?那要看NewControllerInitializers方法中的内容
ctrl, started, err := initFn(klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName)), controllerCtx)
if err != nil {
logger.Error(err, "Error starting controller", "controller", controllerName)
return err
}
// 启动失败则继续下一个
if !started {
logger.Info("Warning: skipping controller", "controller", controllerName)
continue
}
check := controllerhealthz.NamedPingChecker(controllerName)
if ctrl != nil {
// check if the controller supports and requests a debugHandler
// and it needs the unsecuredMux to mount the handler onto.
if debuggable, ok := ctrl.(controller.Debuggable); ok && unsecuredMux != nil {
if debugHandler := debuggable.DebuggingHandler(); debugHandler != nil {
basePath := "/debug/controllers/" + controllerName
unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
}
}
// 如果当前controller支持健康检查,就放入check切片,后面统一注册
if healthCheckable, ok := ctrl.(controller.HealthCheckable); ok {
if realCheck := healthCheckable.HealthChecker(); realCheck != nil {
check = controllerhealthz.NamedHealthChecker(controllerName, realCheck)
}
}
}
controllerChecks = append(controllerChecks, check)
logger.Info("Started controller", "controller", controllerName)
}
// 注册健康检查,类似gin注册路由,每个path对应一个controller的健康检查路径,这样外部就能通过这个path来确定controller是否健康
healthzHandler.AddHealthChecker(controllerChecks...)