一、如何控制nginx负载均衡算法
该段lua程序位于:rootfs/etc/nginx/lua/balancer.lua脚本中
upstream upstream_balancer { server 0.0.0.1; # placeholder balancer_by_lua_block { balancer.balance() } {{ if (gt $cfg.UpstreamKeepaliveConnections 0) }} keepalive {{ $cfg.UpstreamKeepaliveConnections }}; keepalive_timeout {{ $cfg.UpstreamKeepaliveTimeout }}s; keepalive_requests {{ $cfg.UpstreamKeepaliveRequests }}; {{ end }} }
https://github.com/kubernetes/ingress-nginx/blob/3db1dc120260a345144942207392c8c0200c5582/rootfs/etc/nginx/lua/balancer.lua
二、nginx controller 启动命令程序
该命令位于:internal/cmd/nginx/main.go
https://github.com/kubernetes/ingress-nginx/blob/be3ff428583dd37279dfe33613f59ff9765029f6/cmd/nginx/main.go
ngx := controller.NewNGINXController(conf, mc, fs) go handleSigterm(ngx, func(code int) { os.Exit(code) })
开启一个协程处理心信号
三、NewNGINXController 函数
该函数位于:internal/ingress/controller/nginx.go
https://github.com/kubernetes/ingress-nginx/blob/b06e114177fd7aea141a53a5f2845eaffc7f1d6d/internal/ingress/controller/nginx.go
// NewNGINXController creates a new NGINX Ingress controller. func NewNGINXController(config *Configuration, mc metric.Collector, fs file.Filesystem) *NGINXController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ Interface: config.Client.CoreV1().Events(config.Namespace), }) h, err := dns.GetSystemNameServers()
// 采用系统的dns
if err != nil { klog.Warningf("Error reading system nameservers: %v", err) } n := &NGINXController{ isIPV6Enabled: ing_net.IsIPv6Enabled(), resolver: h, cfg: config, syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ Component: "nginx-ingress-controller", }), stopCh: make(chan struct{}), updateCh: channels.NewRingChannel(1024), stopLock: &sync.Mutex{}, fileSystem: fs, runningConfig: new(ingress.Configuration), Proxy: &TCPProxy{}, metricCollector: mc, command: NewNginxCommand(), } if n.cfg.ValidationWebhook != "" { n.validationWebhookServer = &http.Server{ Addr: config.ValidationWebhook, Handler: adm_controler.NewAdmissionControllerServer(&adm_controler.IngressAdmission{Checker: n}), TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(), } }
//获取Pod信息 pod, err := k8s.GetPodDetails(config.Client) if err != nil { klog.Fatalf("unexpected error obtaining pod information: %v", err) } n.podInfo = pod n.store = store.New( config.EnableSSLChainCompletion, config.Namespace, config.ConfigMapName, config.TCPConfigMapName, config.UDPConfigMapName, config.DefaultSSLCertificate, config.ResyncPeriod, config.Client, fs, n.updateCh, config.DynamicCertificatesEnabled, pod, config.DisableCatchAll)
//config map 更新 n.syncQueue = task.NewTaskQueue(n.syncIngress) if config.UpdateStatus { n.syncStatus = status.NewStatusSyncer(pod, status.Config{ Client: config.Client, PublishService: config.PublishService, PublishStatusAddress: config.PublishStatusAddress, IngressLister: n.store, UpdateStatusOnShutdown: config.UpdateStatusOnShutdown, UseNodeInternalIP: config.UseNodeInternalIP, }) } else { klog.Warning("Update of Ingress status is disabled (flag --update-status)") } onTemplateChange := func() { template, err := ngx_template.NewTemplate(tmplPath, fs) if err != nil { // this error is different from the rest because it must be clear why nginx is not working klog.Errorf(` ------------------------------------------------------------------------------- Error loading new template: %v ------------------------------------------------------------------------------- `, err) return } n.t = template klog.Info("New NGINX configuration template loaded.") n.syncQueue.EnqueueTask(task.GetDummyObject("template-change")) } ngxTpl, err := ngx_template.NewTemplate(tmplPath, fs) if err != nil { klog.Fatalf("Invalid NGINX configuration template: %v", err) } n.t = ngxTpl if _, ok := fs.(filesystem.DefaultFs); !ok { // do not setup watchers on tests return n } _, err = watch.NewFileWatcher(tmplPath, onTemplateChange) if err != nil { klog.Fatalf("Error creating file watcher for %v: %v", tmplPath, err) } filesToWatch := []string{} err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { return nil } filesToWatch = append(filesToWatch, path) return nil }) if err != nil { klog.Fatalf("Error creating file watchers: %v", err) } for _, f := range filesToWatch { _, err = watch.NewFileWatcher(f, func() { klog.Infof("File %v changed. Reloading NGINX", f) n.syncQueue.EnqueueTask(task.GetDummyObject("file-change")) }) if err != nil { klog.Fatalf("Error creating file watcher for %v: %v", f, err) } } return n }