前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【K8s】controller-manager 源码分析 01-01

【K8s】controller-manager 源码分析 01-01

原创
作者头像
Librant
修改2022-06-21 08:16:59
4620
修改2022-06-21 08:16:59
举报
文章被收录于专栏:跟我一起学 K8s跟我一起学 K8s

【注】源码分析均以 k8s 的第一个 commit 代码分析;

controller-manager 的入口函数 main():

代码语言:go
复制
cmd/controller-manager/controller-manager.go

启动 controller-manager 的参数:

代码语言:go
复制
var (
	etcd_servers = flag.String("etcd_servers", "", "Servers for the etcd (http://ip:port).")
	master       = flag.String("master", "", "The address of the Kubernetes API server")
)
  • 指定 etcd 的 server 的节点 IP;
  • 指定 controller-manager 的监听 IP 地址;

初始化 controller-manager 的实例:

代码语言:go
复制
controllerManager := registry.MakeReplicationManager()

函数返回 ReplicationManager 的实例;

代码语言:go
复制
// ReplicationManager is responsible for synchronizing ReplicationController objects stored in etcd
// with actual running tasks.
// TODO: Remove the etcd dependency and re-factor in terms of a generic watch interface
type ReplicationManager struct {
	etcdClient  *etcd.Client
	kubeClient  client.ClientInterface
	taskControl TaskControlInterface
	updateLock  sync.Mutex
}
  • etcd 的客户端
  • task/controller/service 的客户端
  • 任务控制接口
  • 更新自旋锁

这里讲下 client.ClientInterface 接口:

代码语言:go
复制
pkg/client/client.go

// ClientInterface holds the methods for clients of Kubenetes, an interface to allow mock testing
type ClientInterface interface {
	ListTasks(labelQuery map[string]string) (api.TaskList, error)
	GetTask(name string) (api.Task, error)
	DeleteTask(name string) error
	CreateTask(api.Task) (api.Task, error)
	UpdateTask(api.Task) (api.Task, error)

	GetReplicationController(name string) (api.ReplicationController, error)
	CreateReplicationController(api.ReplicationController) (api.ReplicationController, error)
	UpdateReplicationController(api.ReplicationController) (api.ReplicationController, error)
	DeleteReplicationController(string) error

	GetService(name string) (api.Service, error)
	CreateService(api.Service) (api.Service, error)
	UpdateService(api.Service) (api.Service, error)
	DeleteService(string) error
}

通过接口可以看到,这里是 Task/Controller/Service 资源的增删改查;

最后启动两个 gorouting 来同步任务和监控任务;

代码语言:go
复制
go util.Forever(func() { controllerManager.Synchronize() }, 20*time.Second)
go util.Forever(func() { controllerManager.WatchControllers() }, 20*time.Second)

1)Synchronize() :副本控制器

  • 定期同步副本数量
代码语言:go
复制
func (rm *ReplicationManager) Synchronize() {}
  • 从 etcd 中获取 controller 的列表信息; -- 获取 json 信息,进行反序列化for _, value := range response.Node.Nodes {}

// 每 10s 同步一次

time.Sleep(10 * time.Second)遍历每个节点,获取每个节点上的 controller 列表信息;

代码语言:go
复制
var controllerSpec ReplicationController
err = rm.syncReplicationController(controllerSpec)

这里对 syncReplicationController() 函数进行分析:

代码语言:go
复制
func (rm *ReplicationManager) syncReplicationController(controllerSpec ReplicationController) error {}
代码语言:go
复制
func (rm *ReplicationManager) syncReplicationController(controllerSpec ReplicationController) error {
	// 加锁,更新任务信息
	rm.updateLock.Lock()
	// 根据 label 标签,获取任务列表
	taskList, err := rm.kubeClient.ListTasks(controllerSpec.DesiredState.ReplicasInSet)
	if err != nil {
		return err
	}
	// 过滤所有退出状态的任务
	filteredList := rm.filterActiveTasks(taskList.Items)
	// 看期望的副本数和当前任务数的差值
	diff := len(filteredList) - controllerSpec.DesiredState.Replicas
	log.Printf("%#v", filteredList)
	if diff < 0 {
		// 将 负数 转换成 正数
		diff *= -1
		log.Printf("Too few replicas, creating %d\n", diff)
		// 根据期望数,使用期望状态创建任务
		for i := 0; i < diff; i++ {
			rm.taskControl.createReplica(controllerSpec)
		}
	} else if diff > 0 {
		// 任务数超过期望数,删除多余的任务
		log.Print("Too many replicas, deleting")
		for i := 0; i < diff; i++ {
			rm.taskControl.deleteTask(filteredList[i].ID)
		}
	}
	// 释放锁
	rm.updateLock.Unlock()
	return nil
}

通过对上面的函数分析,根据 controller 中的期望状态和任务的当前状态进行对比,这个是副本数空值的协程, 每 10s 同步一次;

2)WatchControllers():监控 etcd 中的状态,控制副本数;

代码语言:go
复制
func (rm *ReplicationManager) WatchControllers() {}

函数源码:

代码语言:go
复制
func (rm *ReplicationManager) WatchControllers() {
	// 创建无缓冲的管道
	watchChannel := make(chan *etcd.Response)
	// 通过起协程来监控 etcd 中 key 的状态变化,将变化的信息同步给 controller
	go util.Forever(func() { rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, nil) }, 0)
	for {
		// 持续从管道中读取消息,如果没有变化,这里会阻塞
		watchResponse := <-watchChannel
		// 读出空值,直接休眠 10s,重新读取
		if watchResponse == nil {
			time.Sleep(time.Second * 10)
			continue
		}
		log.Printf("Got watch: %#v", watchResponse)
		// 监控 etcd 中 key 的变化, 对 set 的动作进行响应
		controller, err := rm.handleWatchResponse(watchResponse)
		if err != nil {
			log.Printf("Error handling data: %#v, %#v", err, watchResponse)
			continue
		}
		// 将变动后的信息,同步给副本控制器
		rm.syncReplicationController(*controller)
	}
}
  • 持续监控 etcd 中的值的变化,对于有变化的 key 进行操作;

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
腾讯云代码分析
腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档