专栏首页风云笔记Kubelet Device Plugin 的工作机制
原创

Kubelet Device Plugin 的工作机制

device plugin简介

从Kubernetes 1.8开始,官方推荐使用Device Plugins方式来使用GPU、FPGA、NIC、InfiniBand等高性能硬件。

Kubernetes provides adevice plugin frameworkthat you can use to advertise system hardware
resources to theKubelet.
Instead of customizing the code for Kubernetes itself, vendors can implement adevice plugin that 
you deploy either manually or as aDaemonSet.The targeted devices include GPUs, high-performance
NICs, FPGAs, InfiniBand adapters,and other similar computing resources that may require vendor
specific initializationand setup.

整个 Device Plugin 的工作机制可以分成以下四个部分:

device server启动和注册;

device的分配;

device的使用;

device的状态管理。

注:为了方便理解,先注解关键词的含义:

  1. device server指向kubelet注册device的服务,如NVIDIA的k8s-device-plugin;
  2. deviceManager代表kubelet device plugin的服务ManagerImpl

device server的启动和注册

device server实现了DevicePluginClient接口的方法:

type DevicePluginClient interface {
   // 获取device server的管理plugin的参数,如PreStartRequired用来判断container启动使用plugin之前
   // 是否需要执行PreStartContainer()
   GetDevicePluginOptions(ctx context.Context, in *Empty, opts ...grpc.CallOption)
   (*DevicePluginOptions, error)
   // 监听plugin的状态,当plugin的状态发生变化时及时反馈到deviceManager
   ListAndWatch(ctx context.Context, in *Empty, opts ...grpc.CallOption)
   (DevicePlugin_ListAndWatchClient, error)
   // device server分配plugin,返回container使用该plugin的参数
   Allocate(ctx context.Context, in *AllocateRequest, opts ...grpc.CallOption) 
   (*AllocateResponse, error)
   // container启动使用plugin之前需要执行的操作
   PreStartContainer(ctx context.Context, in *PreStartContainerRequest, opts ...grpc.CallOption)
   (*PreStartContainerResponse, error)
}

device server在启动的时候会监测/var/lib/kubelet/device-plugins/kubelet.sock文件,当该文件被创建,意味着kubelet发生了重启或者刚启动,那么device server会自动重启重新注册。

if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
   log.Printf("inotify: %s created, restarting.", pluginapi.KubeletSocket)
   goto restart
}

device server的启动和注册流程

1.调用Devices()获取自身管理的device详情列表

2.调用Start()方法基于UDS(如/var/lib/kubelet/device-plugins/nvidia.sock)启动一个Grpc服务,kubelet deviceManager就是通过这个UDS和device server通信。

3.调用Register()完成注册,device server通过/var/lib/kubelet/device-plugins/kubelet.sock向kubelet deviceManager注册自己的版本、endpoint(即1中的UDS),名字。

4.调用CheckHealth开启协程检查device列表中各device的健康状况

kubelet deviceManager的Register()负责接收并处理device server的注册请求,先来看看deviceManager的启动:

deviceManager的启动

Device Manager(ManagerImpl)的结构体

type ManagerImpl struct {
   // kubelet对外暴露的socket名,即 kubelet.sock
   socketname string
   // device plugins' socket的存放的目录,/var/lib/kubelet/device-plugins/
   socketdir  string
   // map对象,key为Resource Name,value为endpoint接口(包括run,stop,allocate,preStartContainer,
   // getDevices,callback,isStoped,StopGracePeriodExpired),每个endpoint接口对应一个已注册的device
   // plugin,负责与device plugin的gRPC通信及缓存device plugin反馈的device states。
   endpoints map[string]endpointInfo
   mutex     sync.Mutex
   // Register服务暴露的gRPC Server
   server *grpc.Server
   wg     sync.WaitGroup
   // 用来获取该节点上所有active pods,即non-Terminated状态的Pods, 即:
   // allPods := kl.podManager.GetPods()
   // activePods := kl.filterOutTerminatedPods(allPods)
   activePods ActivePodsFunc
   sourcesReady config.SourcesReady
   // kubelet收到device plugin的ListAndWatch gRCP stream中有devices state变更时的回调函数,
   // 包括有新设备增加、旧设备删除、设备状态变化,所以通过ListAndWatch接口的回调方式,可以实现设备的自动发现
   // 和热插拔。
   callback monitorCallback
   // 缓存所有设备信息,map[设备类型名][设备id]设备详情
   allDevices map[string]map[string]pluginapi.Device
   // key为Resource Name,value为对应的健康的device IDs。
   healthyDevices map[string]sets.String
   // key为Resource Name,value为对应的不健康的device IDs。
   unhealthyDevices map[string]sets.String
   // key为Resource Name,value为已经分配出去的device IDs。
   allocatedDevices map[string]sets.String
   // 记录每个pod中每个容器的device分配情况
   podDevices        podDevices
   // 在/var/lib/kubelet/device-plugins/目录下创建file store类型的key-value存储文件
   // kubelet_internal_checkpoint,用来作为kubelet的device plugin的checkpoint。

   checkpointManager checkpointmanager.CheckpointManager
   numaNodes []int
   // 本地存储设备和pod的对应信息(/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint),
   // 具体存储了每个Pod分配的Devices信息PodDeviceEntries, 以及已经注册的Resource Name及对应的Devices IDs
   topologyAffinityStore topologymanager.Store
}

1.Device Manager在kubelet启动时的NewContainerManager中创建,属于containerManager的子模块

pkg/kubelet/cm/container_manager_linux.go

func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, 
                         nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool,
                         recorder record.EventRecorder) (ContainerManager, error) {
        
...略
if devicePluginEnabled {
   cm.deviceManager, err = devicemanager.NewManagerImpl(numaNodeInfo, cm.topologyManager)
   cm.topologyManager.AddHintProvider(cm.deviceManager)
}

2.初始化结构体 ManagerImpl{}的主要逻辑

//注册callback为genericDeviceUpdateCallback,用来处理对应devices的add,delete,update事件。
manager.callback = manager.genericDeviceUpdateCallback

genericDeviceUpdateCallback的实现:

  • 将callback中收到的devices状态是Healthy,那么将device ID插入到ManagerImpl中healthDevices中,并从unhealthyDevices中删除。
  • 将callback中收到的devices状态是Unhealthy,那么将device ID插入到ManagerImpl中unhealthDevices中,并从healthyDevices中删除。
  • 将device plugin反馈的需要delete的devices从healthDevices和unhealthDevices中一并删除。
  • 将ManagerImpl中的数据更新到/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint文件中。

3.Device Manager的启动Start()ManagerIml.Start()负责启动Device Manager,对外提供gRPC服务。首先读取checkpoint file中数据,恢复ManagerImpl的相关数据,包括:

  • podDevices;
  • allocatedDevices;
  • healthyDevices;
  • unhealthyDevices;
  • endpoints,

注意这里会将endpoint的stop time设置为当前时间,意味着kubelet restart后,需要等待device plugin进行re-register后,才认为这些resource是可用的。

// Removes all stale sockets in m.socketdir. Device plugins can monitor
// this and use it as a signal to re-register with the new Kubelet.
if err := m.removeContents(m.socketdir); err != nil {
   klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
}

然后将/var/lib/kubelet/device-plugins/下面的所有文件清空,当然checkpiont文件除外,也就是清空所有的socket文件,包括自己的kubelet.sock,以及其他所有之前的device plugin的socket文件。

最后创建kubelet.sock并启动gRPC Server对外提供gRPC服务,其中Register()用于Device plugin调用进行插件注册。

// Detect a kubelet restart by watching for a newly created
// 'pluginapi.KubeletSocket' file. When this occurs, restart this loop,
// restarting all of the plugins in the process.
case event := <-watcher.Events:
   if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {
      log.Printf("inotify: %s created, restarting.", pluginapi.KubeletSocket)
      goto restart
   }

device server会监控kubelet.sock文件是否为新创建,如果是,则会触发自己的向kubelet重新注册自己。

deviceManager的注册逻辑

1.deviceManager接收到device server的 RegisterRequest请求,其结构体如下

type RegisterRequest struct {
   Version string
   Endpoint string 
   ResourceName string 
   Options   *DevicePluginOptions 
}

2.检查注册的device Name、version是否符合Extended Resource的规则,Name不能属于kubernetes.io,得有自己的domain,比如nvidia.com

3.根据endpoint信息创建EndpointImpl对象,即根据endpoint建立socket连接

4.将device name、 version、endpoint和socket连接存入deviceManager的缓存中

5.执行EndpointImpl对象的run(),在run方法中:

func (e *endpointImpl) run() {
	//建立stream conn
	stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
	if err != nil {
		klog.Errorf(errListAndWatch, e.resourceName, err)

		return
	}

	for {
		response, err := stream.Recv()
		if err != nil {
			klog.Errorf(errListAndWatch, e.resourceName, err)
			return
		}

		devs := response.Devices
		klog.V(2).Infof("State pushed for device plugin %s", e.resourceName)

		var newDevs []pluginapi.Device
		for _, d := range devs {
			newDevs = append(newDevs, *d)
		}

		e.callback(e.resourceName, newDevs)
	}
}

调用device server的ListAndWatch gRPC接口,通过长连接持续获取ListAndWatch gRPC stream,从stream流中获取的devices详情列表然后调用Endpoint的callback(也就是ManagerImpl注册的callback方法genericDeviceUpdateCallback)进行Device Manager的缓存更新并写到checkpoint文件中。

run()是通过协程启动的,持续获取device server的ListAndWatch结果,持续更新device状态;

当获取异常时,deviceManager断开连接,将device设置为不健康的状态。

回到device server,阅读下ListAndWatch()的工作

func (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, 
                                          s pluginapi.DevicePlugin_ListAndWatchServer) error {
   s.Send(&pluginapi.ListAndWatchResponse{Devices: m.cachedDevices})

   for {
      select {
      case <-m.stop:
         return nil
      case d := <-m.health:
         // FIXME: there is no way to recover from the Unhealthy state.
         d.Health = pluginapi.Unhealthy
         log.Printf("'%s' device marked unhealthy: %s", m.resourceName, d.ID)
         s.Send(&pluginapi.ListAndWatchResponse{Devices: m.cachedDevices})
      }
   }
}

1.先是立马返回device详情列表

2.开启协程,一旦感知device的健康状态发生变化了,更细device详情列表再次返回给deviceManager

3.回想起健康检查,device server的CheckHealth就是2的生产者

注册流程基本走通,了解k8s plugin manager的同学可能有个疑问,plugin manager也能注册device server,那两者是一个什么关系?

简述plugin manager的工作机制

1.PluginManager会监听/var/lib/kubelet/plugins_registry下的UDS文件,当有新创建的文件就会注册它,当文件被删同样也会删除该plugin;该部分注册流程,使用k8s常见的actual-desired模式,本文略过

2.当前支持两种类型的

  • CSIPlugin-->csi.PluginHandler
  • DevicePlugin-->PluginRegistrationHandler

3.plugin客户端注册时需要在GetInfo()指定类型,如csi-node-driver-registrar注册CSI插件时会指定CSIPlugin

4.继续分析下PluginRegistrationHandler:

PluginRegistrationHandler最终还是调用device manager来注册device server

func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
   return cm.deviceManager.GetWatcherHandler()
}
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
   return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
}

RegisterPlugin()和Register()最终调用的注册方法是一样的,也就说(以nvidia为例):

1.device server是通过/var/lib/kubelet/device-plugins/kubelet.sock主动向deviceManager注册自己/var/lib/kubelet/device-plugins/nvidia.sock

2.pluginManager是监听/var/lib/kubelet/plugins_registry目录下的UDS,当/var/lib/kubelet/plugins_registry/nvidia-xxx-reg.sock被创建,pluginManager主动通过该uds文件和注册服务通信,获取device server的注册信息包括name、version、endpoint(/var/lib/kubelet/plugins/nvidia-xxx/xxx.sock)

总结:deviceManager和pluginManager并没有什么关联,deviceManager更专注于注册device server,pluginManager可以注册device server、CSI server等plugin

至此,分析到好几个目录,避免混淆,先列举一下

# pwd
/var/lib/kubelet
# tree
.
├── config.yaml   // kubelet的配置文件
├── cpu_manager_state   // cpu manager的checkpoint文件
├── device-plugins      // device manager的工作目录
│   ├── DEPRECATION
│   ├── kubelet_internal_checkpoint // device manager的checkpoint文件,
// 具体存储了每个Pod分配的Devices信息PodDeviceEntries, 以及已经注册的Resource Name及对应的Devices IDs。
│   └── kubelet.sock    // device manager和device server通信的UDS文件
├── kubeadm-flags.env   // kubeadm配置kubelet服务的配置参数
├── plugins             // 存放kubelet plugin manager和plugin server交互的UDS文件的目录
│   ├── kubernetes.io
│   │   └── csi
│   │       └── pv
│   └── xxx.csi.xxx.io
│       └── csi.sock    // csi server的UDS文件
├── plugins_registry
│   └── xxx.csi.xxx.io-reg.sock  // 用户注册csi server的UDS文件
├── pod-resources
│   └── kubelet.sock    // PodResources service的工作目录

看到pod-resources,可以多扯一下

func (kl *Kubelet) ListenAndServePodResources() {
   socket, err := util.LocalEndpoint(kl.getPodResourcesDir(), podresources.Socket)
   if err != nil {
      klog.V(2).Infof("Failed to get local endpoint for PodResources endpoint: %v", err)
      return
   }
   server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager)
}

kubelet启动时,会创建PodResources rpc服务,该服务用于发现pod正在使用的资源或设备,并提供资源或设备的元数据给第三方监控插件,常用于监控资源或设备。PodResources服务通过unix套接字/var/lib/kubelet/pod-resources/kubelet.sock提供。

// PodResources is a service provided by the kubelet that provides information about the
// node resources consumed by pods and containers on the node
service PodResources {
    rpc List(ListPodResourcesRequest) returns (ListPodResourcesResponse) {}
}

当前 PodResources服务只有一个List()方法

// List returns information about the resources assigned to pods on the node
func (p *podResourcesServer) List(ctx context.Context, req *v1alpha1.ListPodResourcesRequest) 
                                 (*v1alpha1.ListPodResourcesResponse, error) {
…
for j, container := range pod.Spec.Containers {
   pRes.Containers[j] = &v1alpha1.ContainerResources{
      Name:    container.Name,
      Devices: p.devicesProvider.GetDevices(string(pod.UID), container.Name),
   }
}
...
}

讨论参考:https://github.com/kubernetes/enhancements/issues/606

device的分配

kubelet接收到被调度到本节点的pods后,lifecycle.PodAdmitHandler会对pods做一些判断,如

  • evictionAdmitHandler:当节点有内存压力时,拒绝创建best effort的pod,还有其它条件先略过
  • TopologyPodAdmitHandler:拒绝创建因为Topology locality冲突而无法分配资源的pod

总体是对pod的资源做一些准入判断。

func NewPredicateAdmitHandler(getNodeAnyWayFunc getNodeAnyWayFuncType, 
admissionFailureHandler AdmissionFailureHandler, 
pluginResourceUpdateFunc pluginResourceUpdateFuncType) *predicateAdmitHandler {
   return &predicateAdmitHandler{
      getNodeAnyWayFunc,
      pluginResourceUpdateFunc,
      admissionFailureHandler,
   }
}
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, 
attrs *lifecycle.PodAdmitAttributes) error {
   return cm.deviceManager.Allocate(node, attrs)
}

pod的device资源的判断由predicateAdmitHandler调用deviceManager.Allocate来处理,接下来分下ManagerImpl.Allocate()的工作流程:

ManagerImpl.Allocate()的工作流程

1.allocateContainerResources为Pod中的init container分配devices,并更新deviceManager中PodDevices缓存;

2.allocateContainerResources为Pod中的regular container分配devices,并更新deviceManager中PodDevices缓存;每次在为Pod分配devices之前,都去检查一下此时的active pods,并与podDevices缓存中的pods进行比对,将已经terminated的Pods的devices从podDevices中删除,即进行了devices的GC操作。从healthyDevices中随机分配对应数量的devices给该Pod,并注意更新allocatedDevices,否则会导致一个device被分配给多个Pod。拿到devices后,就通过Grpc调用device server的Allocate方法,device server返回ContainerAllocateResponse(包括注入的环境变量、挂载信息、Annotations),deviceManager根据pod uuid和container name将返回的信息存入podDevices缓存。更新podDevices缓存信息,并将deviceManager中缓存数据更新到checkpoint文件中。

3.sanitizeNodeAllocatable更新cache中Node对应Resource Name的Allocatable Resource,对于下一个pod,predicateAdmitHandler使用最新的数据来做准入判断

device的使用

在kubelet的GetResource中,会调用DeviceManager的GetDeviceRunContainerOptions,并将这些options添加到kubecontainer.RunContainerOptions中。RunContainerOptions包括Envs、Mounts、Devices、PortMappings、Annotations等信息。

kubelet调用GetResources()为启动container获取启动参数runtimeapi.ContainerConfig{Args...}

devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
if err != nil {
   return nil, err
} else if devOpts == nil {
   return opts, nil
}
opts.Devices = append(opts.Devices, devOpts.Devices...)
opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
opts.Envs = append(opts.Envs, devOpts.Envs...)
opts.Annotations = append(opts.Annotations, devOpts.Annotations...)

GetDeviceRunContainerOptions()根据pod uuid和container name从podDevices缓存(device的分配过程中会设置缓存数据)中取出Envs、Mounts、Devices、PortMappings、Annotations等信息,另外对于一些PreStartRequired为true的device server,deviceManager需要在启动container之前调用device server的PreStartContainer grpc接口,做一些device的初始化工作,超时时间限制为30秒。

ctx, cancel := context.WithTimeout(context.Background(), pluginapi.KubeletPreStartContainerRPCTimeoutInSecs*time.Second)
defer cancel()
return e.client.PreStartContainer(ctx, &pluginapi.PreStartContainerRequest{
   DevicesIDs: devs,
})

device的状态管理

device的状态管理涉及到以下3个部分:

1.node上的device状态管理当kubelet更新node status时会调用GetCapacity更新device plugins对应的Resource信息。

kubelet_node_status.go调用deviceManager的GetCapacity()获取device的状态,将device状态添加到node info并通过kube-apiserver存入etcd,GetCapacity()返回device server含有的所有device、已经分配给pod使用的device、pod不能使用的device即no-active的device kubelet_node_status.go根据返回的数据更新node info

2.kubelet deviceManager服务的device状态管理其实在device的注册、device分配中都有讲解,即使用checkpoint机制默认是将podDevices以 PodDevicesEntry的格式存入/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint 文件

type PodDevicesEntry struct {
   PodUID        string
   ContainerName string
   ResourceName  string
   DeviceIDs     []string
   AllocResp     []byte     //包含启动container时使用的Envs、Mounts、Devices、PortMappings、Annotations等信息
}

只要device的状态发生了变化(如注册新device、device被分配、device的健康状态发生变化、device被删除),就要将podDevices存入kubelet_internal_checkpoint 文件。kubelet在启动或重启时,都需要读取kubelet_internal_checkpoint 文件里的数据,并以podDevices格式存入podDevices缓存。

3.device server上报device状态本小节的内容在device的注册部分已经讲解过,归纳为deviceManager注册完device server后,会跟device server建立长连接,持续获取device server的ListAndWatch结果,持续更新device状态;当获取异常时,deviceManager断开连接,将device设置为不健康的状态;device server默认会重启重新注册,重新上报device的状态

参考文章:

https://kubernetes.io/docs/concepts/extend-kubernetes/compute-storage-net/device-plugins/

https://www.jianshu.com/p/cad4cd8bc237

https://www.dazhuanlan.com/2019/12/18/5df9aeb084f93/

https://www.cnblogs.com/hezhiqiangTS/p/12011388.html

原创声明,本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

登录 后参与评论
0 条评论

相关文章

  • offline plugin的工作原理

    The plugin is supported for use with applications on the Android and iOS platfor...

    Jerry Wang
  • Kubelet Deivce Manager源码分析

    本文基于Kubernetes v1.10的代码,对Kubelet Device Manager的实现进行了代码走读分析,方便对kubelet与device pl...

    Walton
  • kubelet 中垃圾回收机制的设计与实现

    本文主要分析 kubelet 中的垃圾回收机制,垃圾回收的主要目的是为了节约宿主上的资源,gc controller 的回收机制可以参考以前的文章 garbag...

    田飞雨
  • Kubernetes如何通过Devi

    Device Plugins Device Pulgins在Kubernetes 1.10中是beta特性,开始于Kubernetes 1.8,用来给第三方设备...

    Walton
  • Kubelet Deivce Manager源码分析

    Walton
  • Kubelet从入门到放弃系列:GPU加持

    《Kubelet从入门到放弃系列》将对Kubelet组件由Linux基础知识到源码进行深入梳理。上一篇zouyee带各位看了Kubelet从入门到放弃:拓扑管理...

    CNCF
  • Kubelet从入门到放弃系列:GPU加持

    《Kubelet从入门到放弃系列》将对Kubelet组件由Linux基础知识到源码进行深入梳理。上一篇zouyee带各位看了Kubelet从入门到放弃:拓扑管理...

    zouyee
  • Kubernetes 多卡GPU使用和分析

    Kubernetes中通过device plugin将GPU作为一种resource来使用,因此需要先创建一个device plugin将GPU信息注册到Kub...

    langwu 吴英文
  • ClassLoader的工作机制

    用户2146856
  • ClassLoader的工作机制

    用户2146856
  • ElasticSearch的工作机制

    ElasticSearch,和Solr一样,是底层基于Apache Lucene,且具备高可靠性的企业级搜索引擎。

    星哥玩云
  • 基于 Kubernetes 的 GPU 类型调度实现

    3 月 27 日,ACM 宣布深度学习的三位缔造者——Yoshua Bengio、Yann LeCun 及 Geoffrey Hinton 获得了 2018 年...

    kubernetes中文社区
  • 基于Kubernetes的GPU类型调度实现

    3 月 27 日,ACM 宣布深度学习的三位缔造者——Yoshua Bengio、Yann LeCun 及 Geoffrey Hinton 获得了 2018 年...

    CNCF
  • Docker及Kubernetes下device使用和分析

    Docker有两种方式访问设备,一种是使用特权模式,一种是通过--device指定要访问的设备。

    langwu 吴英文
  • Hadoop的namenode的管理机制,工作机制和datanode的工作原理

    HDFS前言:   1) 设计思想     分而治之:将大文件、大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析; ...

    别先生
  • NVIDIA/k8s-device-plugin源码分析

    Author: xidianwangtao@gmail.com k8s-device-plugin内部实现原理图 在Kubernetes如何通过Devic...

    Walton
  • 如何在Kubernetes集群中利用GPU进行AI训练

    Author: xidianwangtao@gmail.com 注意事项 截止Kubernetes 1.8版本: 对GPU的支持还只是实验阶段,仍停留在A...

    Walton
  • gsoap入门:gsoap的plugin机制说明塈使用plugin实现调试soap函数时显示

    版权声明:本文为博主原创文章,转载请注明源地址。 https://blog.csdn.net...

    用户1148648

扫码关注腾讯云开发者

领取腾讯云代金券