前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过Shared Informer扩展Kubernetes

通过Shared Informer扩展Kubernetes

作者头像
CNCF
发布2019-12-04 11:15:13
2.1K0
发布2019-12-04 11:15:13
举报
文章被收录于专栏:CNCF

客座文章,之前由Gianluca Arbezzano发表。

Kubernetes运行一组控制器,使资源的当前状态与所需的状态保持匹配。可以是一个Pod,服务或任何可以通过Kubernetes控制的东西。K8s的核心价值是可扩展性,允许操作器和应用程序扩展其功能集。基于事件的体系结构,其中所有重要的东西都转换成事件,可以触发自定义代码。

当我想到需要在Kubernetes做某事时采取行动,我的第一个目标是触发的事件,例如:

  • 新创建Pod
  • 新节点加入
  • 服务被移除,还有很多很多。

要了解何时触发这些事件,你可以使用Kubernetes和client-go 暴露的名为SharedInformer的基本功能,该功能位于cache包中。让我们看看在实践中是如何工作的。

https://github.com/kubernetes/client-go

首先,作为与Kubernetes交互的应用程序,你需要建立客户端:

代码语言:javascript
复制
// import "os"
// import  corev1 "k8s.io/api/core/v1"
// import  "k8s.io/client-go/kubernetes"
// import  "k8s.io/client-go/tools/clientcmd"


// Set the kubernetes config file path as environment variable
kubeconfig := os.Getenv("KUBECONFIG")

// Create the client configuration
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
    logger.Panic(err.Error())
    os.Exit(1)
}

// Create the client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
    logger.Panic(err.Error())
    os.Exit(1)
}

正如你所看到的,我几乎是逐行注释代码,以便你很好地理解正在发生的事情。现在有了客户端,可以创建SharedInformerFactory了。shared informer监听特定的资源;工厂帮助你创造你需要的。在这个例子中,它查找Pod SharedInformer:

代码语言:javascript
复制
// import v1 "k8s.io/api/core/v1"
 // import "k8s.io/client-go/informers"
// import  "k8s.io/client-go/tools/cache"
// import "k8s.io/apimachinery/pkg/util/runtime"

// Create the shared informer factory and use the client to connect to
// Kubernetes
factory := informers.NewSharedInformerFactory(clientset, 0)

// Get the informer for the right resource, in this case a Pod
informer := factory.Core().V1().Pods().Informer()

// Create a channel to stops the shared informer gracefully
stopper := make(chan struct{})
defer close(stopper)

// Kubernetes serves an utility to handle API crashes
defer runtime.HandleCrash()

// This is the part where your custom code gets triggered based on the
// event that the shared informer catches
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    // When a new pod gets created
    AddFunc:    func(obj interface{}) { panic("not implemented") },
    // When a pod gets updated
    UpdateFunc: func(interface{}, interface{}) { panic("not implemented") },
    // When a pod gets deleted
    DeleteFunc: func(interface{}) { panic("not implemented") },
})

// You need to start the informer, in my case, it runs in the background
go informer.Run(stopper)

了解shared informer使你能够快速扩展Kubernetes。正如所看到的,它不是大量的代码,接口非常清晰。

用例

我用它们写了很多dirty hack,但也完成自动化。拿一些例子看看:

  1. 在创建具有持久卷的Pod时,我们曾经遇到非常恼人的错误。这不是高速率的错误,重新启动能使一切如预期工作。一个dirty hack十分明显;我使用Shared Informer来自动手动重新启动pod,就像向你展示的一样
  2. 我正在使用AWS,我想把一些EC2标签推送去作为kubelet标签。我使用了Shared Informer,但这一次是为了观察新节点何时加入集群。我可以从新节点获得AWS instanceID(它本身是一个标签),并使用AWS API。我可以检索它的标记来识别如何通过Kubernetes API编辑节点本身。所有内容都是shared informer中AddFunc的一部分。

完整的示例

这个例子是go程序,当包含特定标签的新节点加入集群时,它会记录到日志:

代码语言:javascript
复制
package main

import (
    "fmt"
    "log"
    "os"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/util/runtime"

    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
)

const (
    // K8S_LABEL_AWS_REGION is the key name to retrieve the region from a
    // Node that runs on AWS.
    K8S_LABEL_AWS_REGION = "failure-domain.beta.kubernetes.io/region"
)

func main() {
    log.Print("Shared Informer app started")
    kubeconfig := os.Getenv("KUBECONFIG")
    config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
    if err != nil {
        log.Panic(err.Error())
    }
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Panic(err.Error())
    }

    factory := informers.NewSharedInformerFactory(clientset, 0)
    informer := factory.Core().V1().Nodes().Informer()
    stopper := make(chan struct{})
    defer close(stopper)
    defer runtime.HandleCrash()
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: onAdd,
    })
    go informer.Run(stopper)
    if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }
    <-stopper
}

// onAdd is the function executed when the kubernetes informer notified the
// presence of a new kubernetes node in the cluster
func onAdd(obj interface{}) {
    // Cast the obj as node
    node := obj.(*corev1.Node)
    _, ok := node.GetLabels()[K8S_LABEL_AWS_REGION]
    if ok {
        fmt.Printf("It has the label!")
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CNCF 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档