containerd源码分析

本文是对containerd v0.2.4的源码分析。 ##Containerd源码流程图

源码接口调用详情

从ctr调用containerd-api

####checkpoint(用于快照,docker目前该功能不完善) |ctr cmd | containerd-api | | ----------- |--------| | list | /types.API/ListCheckpoint | | create | /types.API/CreateCheckpoint | | delete | /types.API/DeleteCheckpoint |

####containers |ctr cmd | containerd-api | | ----------- |--------| | list、state| /types.API/State | | pause、resume、update | /types.API/UpdateContainer | | create | /types.API/CreateContainer | | stats | /types.API/Stats | | watch | /types.API/State, /types.API/Events | | exec | /types.API/Events, /types.API/AddProcess, /types.API/UpdateProcess | | kill | /types.API/Signal | | start | /types.API/Events, /types.API/CreateContainer , /types.API/UpdateProcess | | update | /types.API/UpdateContainer | ####events /types.API/Events ####state /types.API/State ####version /types.API/GetServerVersion --return result

从containerd-api至supervisor任务处理

注:API--server.go --> daemon – supervisor.go(handleTask func) ####checkpoint |containerd-api | supervisor | | ----------- |--------| | /types.API/ListCheckpoint (supervisor.GetContainersTask)| getContainers | | /types.API/CreateCheckpoint | createCheckpoint | | /types.API/DeleteCheckpoint | deleteCheckpoint |

####containers |containerd-api | supervisor | | ----------- |--------| | /types.API/State /types.API/Stats (supervisor.GetContainersTask) | getContainers | | /types.API/UpdateContainer (supervisor.UpdateTask) | updateContainer | | /types.API/CreateContainer (supervisor.StartTask) | start | | /types.API/Events | Events| | /types.API/AddProcess | addProcess | | /types.API/UpdateProcess | updateProcess | | /types.API/Signal| signal|

从supervisor至runtime(runC)

####checkpoint |supervisor | runtime | | ----------- |--------| | getContainers | - | | createCheckpoint | (runtime)CheckPoint -->exec.Command(c.runtime,arg....) | | deleteCheckpoint | (runtime)DeleteCheckpoint| ####containers |supervisor | runtime | | ----------- |--------| | getContainers | - | | updateContainer |(runtime)Resume Pause UpdateResources-->exec.Command(c.runtime,arg....) | | start | (runtime supervisor/worker.go) Start -->exec.Command(c.shim,c.id,c.bundle,c.runtime)| | addProcess | (runtime) exec --> exec.Command(c.shim,c.id,c.bundle,c.runtime) | | updateProcess | - | | signal | -|

##以createContainer为例走读代码 ###deamon启动监听tasks及startTasks进程 ####进入main.go main方法调用daemon方法

app.Action = func(context *cli.Context) {
       if err := daemon(context); err != nil {
              logrus.Fatal(err)
       }
}

####进入main.go daemon方法

for i := 0; i < 10; i++ {
       wg.Add(1)
       w := supervisor.NewWorker(sv, wg)
       go w.Start()
}
if err := sv.Start(); err != nil {
       return err
}

####初始化supervisor/worker.go NewWorker并启动监听startTask并处理

func NewWorker(s *Supervisor, wg *sync.WaitGroup) Worker {
       return &worker{
              s:  s,
              wg: wg,
       }
}

func (w *worker) Start() {
       defer w.wg.Done()
       for t := range w.s.startTasks {
              started := time.Now()
              process, err := t.Container.Start(t.CheckpointPath, runtime.NewStdio(t.Stdin, t.Stdout, t.Stderr))
              if err != nil {
                     logrus.WithFields(logrus.Fields{
                            "error": err,
                            "id":    t.Container.ID(),
                     }).Error("containerd: start container")
                     t.Err <- err
                     evt := &DeleteTask{
                            ID:      t.Container.ID(),
                            NoEvent: true,
                            Process: process,
                     }
                     w.s.SendTask(evt)
                     continue
              } 

###启动supervisor/supervisor.go task监听task并处理

func (s *Supervisor) Start() error {
       logrus.WithFields(logrus.Fields{
              "stateDir":    s.stateDir,
              "runtime":     s.runtime,
              "runtimeArgs": s.runtimeArgs,
              "memory":      s.machine.Memory,
              "cpus":        s.machine.Cpus,
       }).Debug("containerd: supervisor running")
       go func() {
              for i := range s.tasks {
                     s.handleTask(i)
 
              }

###containers容器创建示例 Ctl控制台命令入口 ctr/main.go containersCommand

execCommand,
killCommand,
listCommand,
pauseCommand,
resumeCommand,
startCommand,
stateCommand,
statsCommand,
watchCommand,
updateCommand,

####ctr/container.go

var startCommand = cli.Command{
       Name:      "start",
       Usage:     "start a container",
       ArgsUsage: "ID BundlePath”, ————…...
 
        events, err := c.Events(netcontext.Background(), &types.EventsRequest{})/*事件创建*/
       if err != nil {
              fatal(err.Error(), 1)
       }
       if _, err := c.CreateContainer(netcontext.Background(), r); err != nil {/*容器创建*/
              fatal(err.Error(), 1)
       }
       if context.Bool("attach") {
              go func() {
                     io.Copy(stdin, os.Stdin)
                     if _, err := c.UpdateProcess(netcontext.Background(), &types.UpdateProcessRequest{/*更新进程*/
                            Id:         id,
                            Pid:        "init",
                            CloseStdin: true,
                     }); err != nil {
                            fatal(err.Error(), 1)
                     }
                     restoreAndCloseStdin()
              }()
              if tty {
                     resize(id, "init", c)
                     go func() {
                            s := make(chan os.Signal, 64)
                            signal.Notify(s, syscall.SIGWINCH)
                            for range s {
                                   if err := resize(id, "init", c); err != nil {
                                          log.Println(err)
                                   }
                            }
                     }()
              }
              waitForExit(c, events, id, "init", restoreAndCloseStdin)
       }
 
},

###api处理 ####api/grpc/types/api.pb.go

func (c *aPIClient) Events(ctx context.Context, in *EventsRequest, opts ...grpc.CallOption) (API_EventsClient, error) {
       stream, err := grpc.NewClientStream(ctx, &_API_serviceDesc.Streams[0], c.cc, "/types.API/Events", opts...)
       if err != nil {
              return nil, err
       }
       x := &aPIEventsClient{stream}
       if err := x.ClientStream.SendMsg(in); err != nil {
              return nil, err
       }
       if err := x.ClientStream.CloseSend(); err != nil {
              return nil, err
       }
       return x, nil
}
 
func (c *aPIClient) CreateContainer(ctx context.Context, in *CreateContainerRequest, opts ...grpc.CallOption) (*CreateContainerResponse, error) {
       out := new(CreateContainerResponse)
       err := grpc.Invoke(ctx, "/types.API/CreateContainer", in, out, c.cc, opts...)
       if err != nil {
              return nil, err
       }
       return out, nil
}

func (c *aPIClient) UpdateProcess(ctx context.Context, in *UpdateProcessRequest, opts ...grpc.CallOption) (*UpdateProcessResponse, error) {
       out := new(UpdateProcessResponse)
       err := grpc.Invoke(ctx, "/types.API/UpdateProcess", in, out, c.cc, opts...)
       if err != nil {
              return nil, err
       }
       return out, nil
} 
 
func _API_Events_Handler(srv interface{}, stream grpc.ServerStream) error {
       m := new(EventsRequest)
       if err := stream.RecvMsg(m); err != nil {
              return err
       }
       return srv.(APIServer).Events(m, &aPIEventsServer{stream})
}
 
 
func _API_CreateContainer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
       in := new(CreateContainerRequest)
       if err := dec(in); err != nil {
              return nil, err
       }
       if interceptor == nil {
              return srv.(APIServer).CreateContainer(ctx, in)
       }
       info := &grpc.UnaryServerInfo{
              Server:     srv,
              FullMethod: "/types.API/CreateContainer",
       }
       handler := func(ctx context.Context, req interface{}) (interface{}, error) {
              return srv.(APIServer).CreateContainer(ctx, req.(*CreateContainerRequest))
       }
       return interceptor(ctx, in, info, handler)
}
  
func _API_UpdateProcess_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
       in := new(UpdateProcessRequest)
       if err := dec(in); err != nil {
              return nil, err
       }
       if interceptor == nil {
              return srv.(APIServer).UpdateProcess(ctx, in)
       }
       info := &grpc.UnaryServerInfo{
              Server:     srv,
              FullMethod: "/types.API/UpdateProcess",
       }
       handler := func(ctx context.Context, req interface{}) (interface{}, error) {
              return srv.(APIServer).UpdateProcess(ctx, req.(*UpdateProcessRequest))
       }
       return interceptor(ctx, in, info, handler)
}

api/grpc/server/server.go 进入第一步中的tasks及sendTasks处理队列

func (s *apiServer) Events(r *types.EventsRequest, stream types.API_EventsServer) error {
 
events := s.sv.Events(t, r.StoredOnly, r.Id) 

func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContainerRequest) (*types.CreateContainerResponse, error) {
 
s.sv.SendTask(e)
 
apiC, err := createAPIContainer(r.Container, false) 

func (s *apiServer) UpdateProcess(ctx context.Context, r *types.UpdateProcessRequest) (*types.UpdateProcessResponse, error) {
       e := &supervisor.UpdateProcessTask{}
       e.ID = r.Id
       e.PID = r.Pid
       e.Height = int(r.Height)
       e.Width = int(r.Width)
       e.CloseStdin = r.CloseStdin
       s.sv.SendTask(e)
       if err := <-e.ErrorCh(); err != nil {
              return nil, err
       }
       return &types.UpdateProcessResponse{}, nil
}

####supervisor/create.go

func (s *Supervisor) start(t *StartTask) error {
s.startTasks <- task 
}

####supervisor/worker.go

func (w *worker) Start() {
       defer w.wg.Done()
       for t := range w.s.startTasks {

####runtime/container.go

func (c *container) Start(checkpointPath string, s Stdio) (Process, error) {
       processRoot := filepath.Join(c.root, c.id, InitProcessID)
       if err := os.Mkdir(processRoot, 0755); err != nil {
              return nil, err
       }
       cmd := exec.Command(c.shim,
              c.id, c.bundle, c.runtime,
 
       ) ---执行 docker-containerd-shim命令
 
       cmd.Dir = processRoot
       cmd.SysProcAttr = &syscall.SysProcAttr{
              Setpgid: true,
       }
       spec, err := c.readSpec()
       if err != nil {
              return nil, err
       }
       config := &processConfig{
              checkpoint:  checkpointPath,
              root:        processRoot,
              id:          InitProcessID,
              c:           c,
              stdio:       s,
              spec:        spec,
              processSpec: specs.ProcessSpec(spec.Process),
       }
       p, err := newProcess(config)
       if err != nil {
              return nil, err
       }
       if err := c.createCmd(InitProcessID, cmd, p); err != nil {
              return nil, err
       }
       return p, nil
}

查看shim的Main方法注释参数传递

// Arg0: id of the container // Arg1: bundle path // Arg2: runtime binary

##containerd-shim接收后处理 ###containerd-shim/main.go

func start(log *os.File) error {
 
p, err := newProcess(flag.Arg(0), flag.Arg(1), flag.Arg(2))
if err != nil {
       return err
}
defer func() {
       if err := p.Close(); err != nil {
              writeMessage(log, "warn", err)
       }
}()
if err := p.create(); err != nil {
       p.delete()
       return err
} 

###containerd-shim/process.go跳转执行runc命令

func (p *process) create() error {
cmd := exec.Command(p.runtime, args...) 

整个代码分析中,最关键的代码在package supervisor中(supervisor.go , worker.go),由它负责任务的监控和处理。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java Web

使用RESTful风格开发Java Web

1173
来自专栏北京马哥教育

Python大法之告别脚本小子---信息资产收集类脚本编写

在采集到URL之后,要做的就是对目标进行信息资产收集了,收集的越好,你挖到洞也就越多了............当然这一切的前提,就是要有耐心了!!!由于要写工具...

1340
来自专栏沃趣科技

ASM 翻译系列第四十弹:理解ASM中 REQUIRED_MIRROR_FREE_MB和USABLE_FILE_MB的含义

原作者:Harald van Breederode 译者: 魏兴华 审核: 魏兴华 DBGeeK社区联合出品 原文链接:https://prutse...

38412
来自专栏Ryan Miao

gradle中使用嵌入式(embedded) tomcat, debug 启动

在gradle项目中使用embedded tomcat。 最开始部署项目需要手动将web项目打成war包,然后手动上传到tomcat的webapp下,然后启动t...

4459
来自专栏猿天地

spring cloud gateway 全局过滤器

全局过滤器作用于所有的路由,不需要单独配置,我们可以用它来实现很多统一化处理的业务需求,比如权限认证,IP访问限制等等。

7112
来自专栏乐沙弥的世界

[INS-20802] Oracle Net Configuration Assistant failed

        [INS-20802] Oracle Net Configuration Assistant failed。在安装Oracle 11g R2时出...

3304
来自专栏Kevin-ZhangCG

SpringMVC学习笔记之一(SpringMVC架构及与Mybatis整合)

1224
来自专栏安富莱嵌入式技术分享

【RL-TCPnet网络教程】第9章 RL-TCPnet网络协议栈移植(uCOS-III)

本章教程为大家讲解RL-TCPnet网络协议栈的uCOS-III操作系统移植方式,学习了第6章讲解的底层驱动接口函数之后,移植就比较容易了,主要是添加库文件、配...

923
来自专栏个人分享

spark单机模式简单搭建

待安装列表 hadoop hive scala spark 一.环境变量配置: ~/.bash_profile PATH=$PATH:$HOME/bin

4311
来自专栏一个会写诗的程序员的博客

RESTFeel: 一个企业级的API管理&测试平台。RESTFeel帮助你设计、开发、测试您的APIRESTFeel功能简介:MongoDB configuration:Building From

The build file is configured to download and use an embedded Tomcat server. So t...

1064

扫码关注云+社区