前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kubectl exec 背后到底发生了什么?

Kubectl exec 背后到底发生了什么?

作者头像
米开朗基杨
发布2020-05-26 10:22:23
2.7K0
发布2020-05-26 10:22:23
举报
文章被收录于专栏:云原生实验室云原生实验室

对于经常和 Kubernetes 打交道的 YAML 工程师来说,最常用的命令就是 kubectl exec 了,通过它可以直接在容器内执行命令来调试应用程序。如果你不满足于只是用用而已,想了解 kubectl exec 的工作原理,那么本文值得你仔细读一读。本文将通过参考 kubectlAPI ServerKubelet 和容器运行时接口(CRI)Docker API 中的相关代码来了解该命令是如何工作的。

kubectl exec 的工作原理用一张图就可以表示:

kubectl exec

先来看一个例子:

代码语言:javascript
复制
? → kubectl version --short 
Client Version: v1.15.0 
Server Version: v1.15.3

? → kubectl run nginx --image=nginx --port=80 --generator=run-pod/v1
pod/nginx created

? → kubectl get po     
NAME    READY   STATUS    RESTARTS   AGE 
nginx   1/1     Running   0          6s  

? → kubectl exec nginx -- date
Sat Jan 25 18:47:52 UTC 2020

? → kubectl exec -it nginx -- /bin/bash 
root@nginx:/#

第一个 kubectl exec 在容器内执行了 date 命令,第二个 kubectl exec 使用 -i-t 参数进入了容器的交互式 shell。

重复第二个 kubectl exec 命令,打印更详细的日志:

代码语言:javascript
复制
? → kubectl -v=7 exec -it nginx -- /bin/bash                                                         
I0125 10:51:55.434043   28053 loader.go:359] Config loaded from file:  /home/isim/.kube/kind-config-linkerd
I0125 10:51:55.438595   28053 round_trippers.go:416] GET https://127.0.0.1:38545/api/v1/namespaces/default/pods/nginx
I0125 10:51:55.438607   28053 round_trippers.go:423] Request Headers:
I0125 10:51:55.438611   28053 round_trippers.go:426]     Accept: application/json, */*
I0125 10:51:55.438615   28053 round_trippers.go:426]     User-Agent: kubectl/v1.15.0 (linux/amd64) kubernetes/e8462b5
I0125 10:51:55.445942   28053 round_trippers.go:441] Response Status: 200 OK in 7 milliseconds
I0125 10:51:55.451050   28053 round_trippers.go:416] POST https://127.0.0.1:38545/api/v1/namespaces/default/pods/nginx/exec?command=%2Fbin%2Fbash&container=nginx&stdin=true&stdout=true&tty=true
I0125 10:51:55.451063   28053 round_trippers.go:423] Request Headers:
I0125 10:51:55.451067   28053 round_trippers.go:426]     X-Stream-Protocol-Version: v4.channel.k8s.io
I0125 10:51:55.451090   28053 round_trippers.go:426]     X-Stream-Protocol-Version: v3.channel.k8s.io
I0125 10:51:55.451096   28053 round_trippers.go:426]     X-Stream-Protocol-Version: v2.channel.k8s.io
I0125 10:51:55.451100   28053 round_trippers.go:426]     X-Stream-Protocol-Version: channel.k8s.ioI0125 10:51:55.451121   28053 round_trippers.go:426]     User-Agent: kubectl/v1.15.0 (linux/amd64) kubernetes/e8462b5
I0125 10:51:55.465690   28053 round_trippers.go:441] Response Status: 101 Switching Protocols in 14 milliseconds
root@nginx:/#

这里有两个重要的 HTTP 请求:

  • GET 请求用来获取 Pod 信息。
  • POST 请求调用 Pod 的子资源 exec 在容器内执行命令。

子资源(subresource)隶属于某个 K8S 资源,表示为父资源下方的子路径,例如 /logs/status/scale/exec 等。其中每个子资源支持的操作根据对象的不同而改变。

最后 API Server 返回了 101 Ugrade 响应,向客户端表示已切换到 SPDY 协议。

SPDY 允许在单个 TCP 连接上复用独立的 stdin/stdout/stderr/spdy-error 流。

1. API Server 源码分析

请求首先会到底 API Server,先来看看 API Server 是如何注册 rest.ExecRest 处理器来处理子资源请求 /exec 的。这个处理器用来确定 exec 要进入的节点。

API Server 启动过程中做的第一件事就是指挥内嵌的 GenericAPIServer 加载早期的遗留 API(legacy API):

代码语言:javascript
复制
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
 // ...
 if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
  return nil, err
 }
}

在 API 加载过程中,会将类型 LegacyRESTStorage 实例化,创建一个 storage.PodStorage 实例:

代码语言:javascript
复制
podStorage, err := podstore.NewStorage(
 restOptionsGetter,
 nodeStorage.KubeletConnectionInfo,
 c.ProxyTransport,
 podDisruptionClient,
)
if err != nil {
 return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}

随后 storeage.PodStorage 实例会被添加到 map restStorageMap 中。注意,该 map 将路径 pods/exec 映射到了 podStoragerest.ExecRest 处理器。

代码语言:javascript
复制
restStorageMap := map[string]rest.Storage{
 "pods":             podStorage.Pod,
 "pods/attach":      podStorage.Attach,
 "pods/status":      podStorage.Status,
 "pods/log":         podStorage.Log,
 "pods/exec":        podStorage.Exec,
 "pods/portforward": podStorage.PortForward,
 "pods/proxy":       podStorage.Proxy,
 "pods/binding":     podStorage.Binding,
 "bindings":         podStorage.LegacyBinding,

podstorage 为 pod 和子资源提供了 CURD 逻辑和策略的抽象。更多详细信息请查看内嵌的 genericregistry.Store

map restStorageMap 会成为实例 apiGroupInfo 的一部分,添加到 GenericAPIServer 中:

代码语言:javascript
复制
if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
 return err
}

// Install the version handler.
// Add a handler at /<apiPrefix> to enumerate the supported api versions.
s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())

其中 GoRestfulContainer.ServeMux 会将传入的请求 URL 映射到不同的处理器。

接下来重点观察处理器 therest.ExecRest 的工作原理,它的 Connect() 方法会调用函数 pod.ExecLocation() 来确定 pod 中容器的 exec 子资源的 URL

代码语言:javascript
复制
// Connect returns a handler for the pod exec proxy
func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
 execOpts, ok := opts.(*api.PodExecOptions)
 if !ok {
  return nil, fmt.Errorf("invalid options object: %#v", opts)
 }
 location, transport, err := pod.ExecLocation(r.Store, r.KubeletConn, ctx, name, execOpts)
 if err != nil {
  return nil, err
 }
 return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil
}

函数 pod.ExecLocation() 返回的 URL 被 API Server 用来决定连接到哪个节点。

下面接着分析节点上的 Kubelet 源码。

2. Kubelet 源码分析

到了 Kubelet 这边,我们需要关心两点:

  • Kubelet 是如何注册 exec 处理器的?
  • Kubelet 与 Docker API 如何交互?

Kubelet 的初始化过程非常复杂,主要涉及到两个函数:

  • PreInitRuntimeService() : 使用 dockershim 包来初始化 CRI
  • RunKubelet() : 注册处理器,启动 Kubelet 服务。

注册处理器

当 Kubelet 启动时,它的 RunKubelet() 函数会调用私有函数 startKubelet() 来启动 kubelet.Kubelet 实例的 ListenAndServe() 方法,然后该方法会调用函数 ListenAndServeKubeletServer() ,使用构造函数 NewServer() 来安装 『debugging』处理器:

代码语言:javascript
复制
// NewServer initializes and configures a kubelet.Server object to handle HTTP requests.
func NewServer(
 // ...
 criHandler http.Handler) Server {
 // ...
 if enableDebuggingHandlers {
  server.InstallDebuggingHandlers(criHandler)
  if enableContentionProfiling {
   goruntime.SetBlockProfileRate(1)
  }
 } else {
  server.InstallDebuggingDisabledHandlers()
 }
 return server
}

InstallDebuggingHandlers() 函数使用 getExec() 处理器来注册 HTTP 请求模式:

代码语言:javascript
复制
// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
  // ...
  ws = new(restful.WebService)
 ws.
  Path("/exec")
 ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
  To(s.getExec).
  Operation("getExec"))
 ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
  To(s.getExec).
  Operation("getExec"))
 ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
  To(s.getExec).
  Operation("getExec"))
 ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
  To(s.getExec).
  Operation("getExec"))
 s.restfulCont.Add(ws)

其中 getExec() 处理器又会调用 s.host 实例中的 GetExec() 方法:

代码语言:javascript
复制
// getExec handles requests to run a command inside a container.
func (s *Server) getExec(request *restful.Request, response *restful.Response) {
   // ...
 podFullName := kubecontainer.GetPodFullName(pod)
 url, err := s.host.GetExec(podFullName, params.podUID, params.containerName, params.cmd, *streamOpts)
 if err != nil {
  streaming.WriteError(err, response.ResponseWriter)
  return
 }
 // ...
}

s.host 被实例化为 kubelet.Kubelet 类型的一个实例,它嵌套引用了 StreamingRuntime 接口,该接口又被实例化为 kubeGenericRuntimeManager 的实例,即运行时管理器。该运行时管理器是 Kubelet 与 Docker API 交互的关键组件,GetExec() 方法就是由它实现的:

代码语言:javascript
复制
// GetExec gets the endpoint the runtime will serve the exec request from.
func (m *kubeGenericRuntimeManager) GetExec(id kubecontainer.ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error) {
 // ...
 resp, err := m.runtimeService.Exec(req)
 if err != nil {
  return nil, err
 }

 return url.Parse(resp.Url)
}

GetExec() 又会调用 runtimeService.Exec() 方法,进一步挖掘你会发现 runtimeService 是 CRI 包中定义的接口。kuberuntime.kubeGenericRuntimeManagerruntimeService 被实例化为 kuberuntime.instrumentedRuntimeService 类型,由它来实现 runtimeService.Exec() 方法:

代码语言:javascript
复制
func (in instrumentedRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
 const operation = "exec"
 defer recordOperation(operation, time.Now())

 resp, err := in.service.Exec(req)
 recordError(operation, err)
 return resp, err
}

instrumentedRuntimeService 实例的嵌套服务对象被实例化为 theremote.RemoteRuntimeService 类型的实例。该类型实现了 Exec() 方法:

代码语言:javascript
复制
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (r *RemoteRuntimeService) Exec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
 ctx, cancel := getContextWithTimeout(r.timeout)
 defer cancel()

 resp, err := r.runtimeClient.Exec(ctx, req)
 if err != nil {
  klog.Errorf("Exec %s '%s' from runtime service failed: %v", req.ContainerId, strings.Join(req.Cmd, " "), err)
  return nil, err
 }

 if resp.Url == "" {
  errorMessage := "URL is not set"
  klog.Errorf("Exec failed: %s", errorMessage)
  return nil, errors.New(errorMessage)
 }

 return resp, nil
}

Exec() 方法会向 /runtime.v1alpha2.RuntimeService/Exec 发起一个 gRPC 调用来让运行时端准备一个流式通信的端点,该端点用于在容器中执行命令(关于如何将 Docker shim 设置为 gRPC 服务端的更多信息请参考下一小节)。

gRPC 服务端通过调用 RuntimeServiceServer.Exec() 方法来处理请求,该方法由 dockershim.dockerService 结构体实现:

代码语言:javascript
复制
// Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
func (ds *dockerService) Exec(_ context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
 if ds.streamingServer == nil {
  return nil, streaming.NewErrorStreamingDisabled("exec")
 }
 _, err := checkContainerStatus(ds.client, req.ContainerId)
 if err != nil {
  return nil, err
 }
 return ds.streamingServer.GetExec(req)
}

第 10 行的 ThestreamingServer 是一个 streaming.Server 接口,它在构造函数 dockershim.NewDockerService() 中被实例化:

代码语言:javascript
复制
// create streaming server if configured.
if streamingConfig != nil {
 var err error
 ds.streamingServer, err = streaming.NewServer(*streamingConfig, ds.streamingRuntime)
 if err != nil {
  return nil, err
 }
}

来看一下 GetExec() 方法的实现方式:

代码语言:javascript
复制
func (s *server) GetExec(req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
 if err := validateExecRequest(req); err != nil {
  return nil, err
 }
 token, err := s.cache.Insert(req)
 if err != nil {
  return nil, err
 }
 return &runtimeapi.ExecResponse{
  Url: s.buildURL("exec", token),
 }, nil
}

可以看到这里只是向客户端返回一个简单的 token 组合成的 URL, 之所以生成一个 token 是因为用户的命令中可能包含各种各样的字符,各种长度的字符,需要格式化为一个简单的 token。该 token 会缓存在本地,后面真正的 exec 请求会携带这个 token,通过该 token 找到之前的具体请求。其中 restful.WebService 实例会将 pod exec 请求路由到这个端点:

代码语言:javascript
复制
// InstallDebuggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers(criHandler http.Handler) {
  // ...
  ws = new(restful.WebService)
 ws.
  Path("/exec")
 ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
  To(s.getExec).
  Operation("getExec"))
 ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
  To(s.getExec).
  Operation("getExec"))
 ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
  To(s.getExec).
  Operation("getExec"))
 ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
  To(s.getExec).
  Operation("getExec"))
 s.restfulCont.Add(ws)

创建 Docker shim

PreInitRuntimeService() 函数作为 gRPC 服务端,负责创建并启动 Docker shim。在将dockershim.dockerService 类型实例化时,让其嵌套的 streamingRuntime 实例引用 dockershim.NativeExecHandler 的实例(该实例实现了 dockershim.ExecHandler 接口)。

代码语言:javascript
复制
ds := &dockerService{
 // ...
 streamingRuntime: &streamingRuntime{
  client:      client,
  execHandler: &NativeExecHandler{},
 },
 // ...
}

使用 Docker 的 exec API 在容器中执行命令的核心实现就是 NativeExecHandler.ExecInContainer() 方法:

代码语言:javascript
复制
func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error {
 // ...
 startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty}
 streamOpts := libdocker.StreamOptions{
  InputStream:  stdin,
  OutputStream: stdout,
  ErrorStream:  stderr,
  RawTerminal:  tty,
  ExecStarted:  execStarted,
 }
 err = client.StartExec(execObj.ID, startOpts, streamOpts)
 if err != nil {
  return err
 }
 // ...

这里就是最终 Kubelet 调用 Docker exec API 的地方。

最后需要搞清楚的是 streamingServer 处理器如何处理 exec 请求。首先需要找到它的 exec 处理器,我们直接从构造函数 streaming.NewServer() 开始往下找,因为这是将 /exec/{token} 路径绑定到 serveExec 处理器的地方:

代码语言:javascript
复制
ws := &restful.WebService{}
endpoints := []struct {
 path    string
 handler restful.RouteFunction
}{
 {"/exec/{token}", s.serveExec},
 {"/attach/{token}", s.serveAttach},
 {"/portforward/{token}", s.servePortForward},
}

所有发送到 dockershim.dockerService 实例的请求最终都会在 streamingServer 处理器上完成,因为 dockerService.ServeHTTP() 方法会调用 streamingServer 实例的 ServeHTTP() 方法。

serveExec 处理器会调用 remoteCommand.ServeExec() 函数,这个函数又是干嘛的呢?它会调用前面提到的 Executor.ExecInContainer() 方法,而 ExecInContainer() 方法是知道如何与 Docker exec API 通信的:

代码语言:javascript
复制
// ServeExec handles requests to execute a command in a container. After
// creating/receiving the required streams, it delegates the actual execution
// to the executor.
func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) {
 // ...
 err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0)
 if err != nil {
 // ...
 } else {
 // ... 
 }
}

3. 总结

本文通过解读 kubectlAPI ServerCRI 的源码,帮助大家理解 kubectl exec 命令的工作原理,当然,这里并没有涉及到 Docker exec API 的细节,也没有涉及到 docker exec 的工作原理。

首先,kubectl 向 API Server 发出了 GETPOST 请求,API Server 返回了 101 Ugrade 响应,向客户端表示已切换到 SPDY 协议。

随后 API Server 使用 storage.PodStoragerest.ExecRest 来提供处理器的映射和执行逻辑,其中 rest.ExecRest 处理器决定 exec 要进入的节点。

最后 Kubelet 向 Docker shim 请求一个流式端点 URL,并将 exec 请求转发到 Docker exec API。kubelet 再将这个 URL 以 Redirect 的方式返回给 API Server,请求就会重定向到到对应 Streaming Server 上发起的 exec 请求,并维护长链。

虽然本文只关注了 kubectl exec 命令,但其他的子命令(例如 attachport-forwardlog 等等)也遵循了类似的实现模式:

kubectl

原文链接:https://itnext.io/how-it-works-kubectl-exec-e31325daa910

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 云原生实验室 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. API Server 源码分析
  • 2. Kubelet 源码分析
    • 注册处理器
      • 创建 Docker shim
      • 3. 总结
      相关产品与服务
      容器镜像服务
      容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档