客座文章,之前由Gianluca Arbezzano发表。
Kubernetes运行一组控制器,使资源的当前状态与所需的状态保持匹配。可以是一个Pod,服务或任何可以通过Kubernetes控制的东西。K8s的核心价值是可扩展性,允许操作器和应用程序扩展其功能集。基于事件的体系结构,其中所有重要的东西都转换成事件,可以触发自定义代码。
当我想到需要在Kubernetes做某事时采取行动,我的第一个目标是触发的事件,例如:
要了解何时触发这些事件,你可以使用Kubernetes和client-go 暴露的名为SharedInformer的基本功能,该功能位于cache包中。让我们看看在实践中是如何工作的。
https://github.com/kubernetes/client-go
首先,作为与Kubernetes交互的应用程序,你需要建立客户端:
// import "os"
// import corev1 "k8s.io/api/core/v1"
// import "k8s.io/client-go/kubernetes"
// import "k8s.io/client-go/tools/clientcmd"
// Set the kubernetes config file path as environment variable
kubeconfig := os.Getenv("KUBECONFIG")
// Create the client configuration
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
logger.Panic(err.Error())
os.Exit(1)
}
// Create the client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
logger.Panic(err.Error())
os.Exit(1)
}
正如你所看到的,我几乎是逐行注释代码,以便你很好地理解正在发生的事情。现在有了客户端,可以创建SharedInformerFactory了。shared informer监听特定的资源;工厂帮助你创造你需要的。在这个例子中,它查找Pod SharedInformer:
// import v1 "k8s.io/api/core/v1"
// import "k8s.io/client-go/informers"
// import "k8s.io/client-go/tools/cache"
// import "k8s.io/apimachinery/pkg/util/runtime"
// Create the shared informer factory and use the client to connect to
// Kubernetes
factory := informers.NewSharedInformerFactory(clientset, 0)
// Get the informer for the right resource, in this case a Pod
informer := factory.Core().V1().Pods().Informer()
// Create a channel to stops the shared informer gracefully
stopper := make(chan struct{})
defer close(stopper)
// Kubernetes serves an utility to handle API crashes
defer runtime.HandleCrash()
// This is the part where your custom code gets triggered based on the
// event that the shared informer catches
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// When a new pod gets created
AddFunc: func(obj interface{}) { panic("not implemented") },
// When a pod gets updated
UpdateFunc: func(interface{}, interface{}) { panic("not implemented") },
// When a pod gets deleted
DeleteFunc: func(interface{}) { panic("not implemented") },
})
// You need to start the informer, in my case, it runs in the background
go informer.Run(stopper)
了解shared informer使你能够快速扩展Kubernetes。正如所看到的,它不是大量的代码,接口非常清晰。
用例
我用它们写了很多dirty hack,但也完成自动化。拿一些例子看看:
完整的示例
这个例子是go程序,当包含特定标签的新节点加入集群时,它会记录到日志:
package main
import (
"fmt"
"log"
"os"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
const (
// K8S_LABEL_AWS_REGION is the key name to retrieve the region from a
// Node that runs on AWS.
K8S_LABEL_AWS_REGION = "failure-domain.beta.kubernetes.io/region"
)
func main() {
log.Print("Shared Informer app started")
kubeconfig := os.Getenv("KUBECONFIG")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Panic(err.Error())
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Panic(err.Error())
}
factory := informers.NewSharedInformerFactory(clientset, 0)
informer := factory.Core().V1().Nodes().Informer()
stopper := make(chan struct{})
defer close(stopper)
defer runtime.HandleCrash()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
})
go informer.Run(stopper)
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
<-stopper
}
// onAdd is the function executed when the kubernetes informer notified the
// presence of a new kubernetes node in the cluster
func onAdd(obj interface{}) {
// Cast the obj as node
node := obj.(*corev1.Node)
_, ok := node.GetLabels()[K8S_LABEL_AWS_REGION]
if ok {
fmt.Printf("It has the label!")
}
}