运维团队希望管控部署在k8s集群里对外暴露的服务,开发团队无需关心服务如何暴露给用户。
开发团队创建应用的manifests,开发团队可以为Service:资源添加annotation
为ingress/http:true
来决定终端用户是否可以访问到该服务,默认不能访问到。至于具体如何让用户可以访问到服务,开发团队不需要关心。
custom controller需要监听Service资源,当Service.发生变化时
Ingress Controller使用nginx ingress Controller,类似集群网关(ingress controller 也有 apisix实现),根据 Ingress为我们更新nginx的配置,最后,终端用户便可以通过Ingress Controlleri的地址访问到开发团队指定的服务。
ingress/http:true
的annotation
,有的话,查询对应的ingress是否存在,不存在就创建ingressfunc main() {
// config
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
clusterConfig, err := rest.InClusterConfig()
if err != nil {
panic(err)
}
config = clusterConfig
}
// client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// factory
factory := informers.NewSharedInformerFactory(clientset, 0)
// informer
serviceInformer := factory.Core().V1().Services()
ingressInformer := factory.Networking().V1().Ingresses()
// addEvent
controller := NewController(clientset, serviceInformer, ingressInformer)
// start
stopCh := make(chan struct{})
factory.Start(stopCh)
factory.WaitForCacheSync(stopCh)
controller.Run(stopCh)
}
const (
workNum = 5
maxRetry = 10
)
type Controller struct {
// 从indexer里获取数据
serviceLister v13.ServiceLister
ingressLister v14.IngressLister
client *kubernetes.Clientset
queue workqueue.RateLimitingInterface
}
func NewController(client *kubernetes.Clientset, serviceInformer v1.ServiceInformer, ingressInformer v12.IngressInformer) *Controller {
c := &Controller{
client: client,
serviceLister: serviceInformer.Lister(),
ingressLister: ingressInformer.Lister(),
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
Name: "ingress",
}),
}
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: c.addService, UpdateFunc: c.updateService})
// ingress被删除时,只要service还存在,就会重新创建ingress
ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{DeleteFunc: c.deleteIngress})
return c
}
func (c Controller) addService(obj interface{}) {
fmt.Println("add service")
c.enqueue(obj)
}
func (c Controller) updateService(oldObj interface{}, newObj interface{}) {
fmt.Println("update service")
if reflect.DeepEqual(oldObj, newObj) {
return
}
c.enqueue(newObj)
}
func (c Controller) deleteIngress(obj interface{}) {
fmt.Println("delete ingress")
ig := obj.(*v15.Ingress)
_, ok := ig.Annotations["ingress/http"]
if !ok {
return
}
// 重新创建ingress
c.enqueue(obj)
}
func (c Controller) enqueue(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
c.queue.Add(key)
}
func (c Controller) Run(ch chan struct{}) {
fmt.Println("controller run")
for i := 0; i < workNum; i++ {
go c.work()
}
<-ch
}
func (c Controller) work() {
for c.processNextItem() {
}
}
func (c Controller) processNextItem() bool {
item, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(item)
err := c.syncService(item.(string))
if err != nil {
// 异常情况进行重试
c.handleErr(item.(string), err)
}
return true
}
func (c Controller) handleErr(key string, err error) {
if c.queue.NumRequeues(key) <= maxRetry {
c.queue.Add(key)
return
}
c.queue.Forget(key)
runtime.HandleError(err)
}
func (c Controller) syncService(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
ingressExist := false
_, err = c.ingressLister.Ingresses(namespace).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
} else {
ingressExist = true
}
serviceExist := false
service, err := c.serviceLister.Services(namespace).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
return err
}
} else {
serviceExist = true
}
annotationExist := false
if serviceExist {
if _, ok := service.Annotations["ingress/http"]; ok {
annotationExist = true
}
}
if annotationExist && !ingressExist {
if err := c.createIngress(service); err != nil {
return err
}
} else if !annotationExist && ingressExist {
if err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, v16.DeleteOptions{}); err != nil {
return err
}
}
return nil
}
func (c Controller) createIngress(service *v17.Service) error {
ingress := &v15.Ingress{}
ingress.Name = service.Name
ingress.Namespace = service.Namespace
ingress.Annotations = map[string]string{
"ingress/http": "true",
}
// 删除service,ingress也会被删除
ingress.ObjectMeta.OwnerReferences = []v16.OwnerReference{
*v16.NewControllerRef(service, v16.SchemeGroupVersion.WithKind("Service")),
}
pathType := v15.PathTypePrefix
ingressClassName := "nginx"
ingress.Spec = v15.IngressSpec{
Rules: []v15.IngressRule{
{
Host: "test.com",
IngressRuleValue: v15.IngressRuleValue{
HTTP: &v15.HTTPIngressRuleValue{
Paths: []v15.HTTPIngressPath{
{
Path: "/",
PathType: &pathType,
Backend: v15.IngressBackend{
Service: &v15.IngressServiceBackend{
Name: service.Name,
Port: v15.ServiceBackendPort{Number: 80},
},
},
},
},
},
},
},
},
IngressClassName: &ingressClassName,
}
_, err := c.client.NetworkingV1().Ingresses(service.Namespace).Create(context.TODO(), ingress, v16.CreateOptions{})
return err
}
Post Views: 8