前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于源码深入浅出来理解k8s的services工作原理

基于源码深入浅出来理解k8s的services工作原理

作者头像
机械视角
发布2021-01-06 15:24:11
8650
发布2021-01-06 15:24:11
举报
文章被收录于专栏:TensorbytesTensorbytes

k8s 中的 services 就是一组同label类型 pod 的服务抽象,为服务提供了LB和反向代理的能力,集群中表示一个微服务的概念。kube-proxy 则是 service 具体实现,这里从工作机制(宏观)到源码(微观)解读 services 实现原理。

services 和 endpoints

先对 k8s 中两个基本的网络概念来做个简单的回顾,services 和 endpoints,下面是 services 和 endpoints 资源:

services:

代码语言:javascript
复制
apiVersion: v1
kind: Service
metadata:
  name: playmate-model
  namespace: rcmd
spec:
  clusterIP: 10.247.168.174
  ports:
  - name: grpc
    port: 8000
    protocol: TCP
    targetPort: 8000
  selector:
    app: playmate-model
  sessionAffinity: None
  type: ClusterIP

endpoints

代码语言:javascript
复制
apiVersion: v1
kind: Endpoints
metadata:
  name: playmate-model
  namespace: rcmd
subsets:
- addresses:
  - ip: 10.0.2.137
    nodeName: 10.213.20.91
    targetRef:
      kind: Pod
      name: playmate-model-8cff8978f-mwtkt
      namespace: rcmd
      resourceVersion: "17169006"
      uid: 86898a37-4ff5-441d-a863-c70fa65d78f0
  notReadyAddresses:
  - ip: 10.0.2.130
    nodeName: 10.213.20.91
    targetRef:
      kind: Pod
      name: playmate-model-89c8d5f8f-t2r5n
      namespace: rcmd
      resourceVersion: "15708217"
      uid: 6884d0c3-e37f-4907-9b43-2903224f7773
  ports:
  - name: grpc
    port: 8000
    protocol: TCP

可以看出,最简单的 services 结构包含了一个 clusterIP,暴露端口和一个 selector 选择器。 endpoints 对象包含末端目标的ip地址和目标点的pod的信息。

而我们一般创建 serivces 并不需要手动创建 endpoints,集群会尝试根据 selector 找到对应的pod信息,然后基于找到的匹配的pod信息创建 pod 为后端的 endpoints 对象,流程如下图:

当然我们也可以手动创建 endpoints,比如我们为外部服务提供集群内的服务发现可以手动设置 services 和 endpoints:

代码语言:javascript
复制
apiVersion: v1
kind: Service
metadata:
  name: hbase-broker-1
  namespace: rcmd
spec:
  clusterIP: 10.247.180.39
  ports:
  - port: 2181
    protocol: TCP
    targetPort: 2181
  sessionAffinity: None
  type: ClusterIP
代码语言:javascript
复制
apiVersion: v1
kind: Endpoints
metadata:
  name: hbase-broker-1
  namespace: rcmd
subsets:
- addresses:
  - ip: 10.10.14.115 # 外部服务地址
  ports:
  - port: 2181
    protocol: TCP

从 kube-proxy 看 services 工作机制

kube-proxy 是负责 services 和 endpoints 在各节点的具体实现,kube-proxy 和 kubelet 一样会在每个节点都运行一个实例,为 services 提供做简单的 TCP, UDP和 SCTP 流量转发,转发到对应的目标(endpoints)。下面通过 kube-proxy 源码解读可以更好地了解 services 和 endpoints 的运作机制。

这里是基于release-1.17,commit sha:15600ef9855dbdfd6e07f6a76e660b1b6030387e

先从 cmd/kube-proxy/proxy.go 开始:

代码语言:javascript
复制
...
func main() {
    ...
    command := app.NewProxyCommand()
    ...
    if err := command.Execute(); err != nil {
        os.Exit(1)
    }
}

这是个k8s命令的标准源码,声明命令调用Execute(读过 k8s 命令源码得都知道都是这个讨论~:)

看下 NewProxyCommond方法:

代码语言:javascript
复制
// NewProxyCommand creates a *cobra.Command object with default parameters
func NewProxyCommand() *cobra.Command {
    opts := NewOptions()

    cmd := &cobra.Command{
        Use: "kube-proxy",
        Long: `The Kubernetes network proxy runs on each node. This
reflects services as defined in the Kubernetes API on each node and can do simple
TCP, UDP, and SCTP stream forwarding or round robin TCP, UDP, and SCTP forwarding across a set of backends.
Service cluster IPs and ports are currently found through Docker-links-compatible
environment variables specifying ports opened by the service proxy. There is an optional
addon that provides cluster DNS for these cluster IPs. The user must create a service
with the apiserver API to configure the proxy.`,
        Run: func(cmd *cobra.Command, args []string) {
            verflag.PrintAndExitIfRequested()
            cliflag.PrintFlags(cmd.Flags())

            if err := initForOS(opts.WindowsService); err != nil {
                klog.Fatalf("failed OS init: %v", err)
            }

            if err := opts.Complete(); err != nil {
                klog.Fatalf("failed complete: %v", err)
            }
            if err := opts.Validate(); err != nil {
                klog.Fatalf("failed validate: %v", err)
            }
      // 这里是执行 -->
            if err := opts.Run(); err != nil {
                klog.Exit(err)
            }
        },
        Args: func(cmd *cobra.Command, args []string) error {
            for _, arg := range args {
                if len(arg) > 0 {
                    return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
                }
            }
            return nil
        },
    }
    ...
    return cmd
}

// Run runs the specified ProxyServer.
func (o *Options) Run() error {
    defer close(o.errCh)
    if len(o.WriteConfigTo) > 0 {
        return o.writeConfigFile()
    }
    // 声明一个 proxyserver 对象
    proxyServer, err := NewProxyServer(o)
    if err != nil {
        return err
    }

    if o.CleanupAndExit {
        return proxyServer.CleanupAndExit()
    }

    o.proxyServer = proxyServer
    // proxyserver 执行
    return o.runLoop()
}

// runLoop will watch on the update change of the proxy server's configuration file.
// Return an error when updated
func (o *Options) runLoop() error {
    if o.watcher != nil {
        o.watcher.Run()
    }

    // run the proxy in goroutine
    go func() {
        err := o.proxyServer.Run()
        o.errCh <- err
    }()

    for {
        err := <-o.errCh
        if err != nil {
            return err
        }
    }
}

Kubernetes 网络代理在每个节点上运行。网络代理反映了每个节点上 Kubernetes API 中定义的服务,并且可以执行简单的 TCP、UDP 和 SCTP 流转发,或者在一组后端进行循环 TCP、UDP 和 SCTP 转发。当前可通过 Docker-links-compatible 环境变量找到服务集群 IP 和端口,这些环境变量指定了服务代理打开的端口。有一个可选的插件,可以为这些集群 IP 提供集群 DNS。用户必须使用 apiserver API 创建服务才能配置代理。

cobra.Command 是一个标准的 k8s 命令行结构体,直接看 RUN 方法就可以了,RUN 里面有一个段对 kube-proxy 命令的描述:

Kubernetes 网络代理在每个节点上运行。网络代理反映了每个节点上 Kubernetes API 中定义的服务,并且可以执行简单的 TCP、UDP 和 SCTP 流转发,或者在一组后端进行循环 TCP、UDP 和 SCTP 转发。当前可通过 Docker-links-compatible 环境变量找到服务集群 IP 和端口,这些环境变量指定了服务代理打开的端口。有一个可选的插件,可以为这些集群 IP 提供集群 DNS。用户必须使用 apiserver API 创建服务才能配置代理。

这里核心的是 opts.Run() 方法,进去后可以看到一个 NewProxyServer 声明了一个ProxyServer结构体,调用了o.proxyServer.Run()方法,我们先看看 proxyServer 结构体。

ProxyServer 三种运行模式

代码语言:javascript
复制
// NewProxyServer returns a new ProxyServer.
func NewProxyServer(o *Options) (*ProxyServer, error) {
    return newProxyServer(o.config, o.CleanupAndExit, o.master)
}

func newProxyServer(
    config *proxyconfigapi.KubeProxyConfiguration,
    cleanupAndExit bool,
    master string) (*ProxyServer, error) {
    ...
    proxyMode := getProxyMode(string(config.Mode), kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
    ...
    if proxyMode == proxyModeIPTables {
        klog.V(0).Info("Using iptables Proxier.")
        ...
        proxier, err = iptables.NewProxier(
            iptInterface,
            utilsysctl.New(),
            execer,
            config.IPTables.SyncPeriod.Duration,
            config.IPTables.MinSyncPeriod.Duration,
            config.IPTables.MasqueradeAll,
            int(*config.IPTables.MasqueradeBit),
            config.ClusterCIDR,
            hostname,
            nodeIP,
            recorder,
            healthzServer,
            config.NodePortAddresses,
        )
        ...
    } else if proxyMode == proxyModeIPVS {
        klog.V(0).Info("Using ipvs Proxier.")
        ...
        proxier, err = ipvs.NewProxier(
            iptInterface,
            ipvsInterface,
            ipsetInterface,
            utilsysctl.New(),
            execer,
            config.IPVS.SyncPeriod.Duration,
            config.IPVS.MinSyncPeriod.Duration,
            config.IPVS.ExcludeCIDRs,
            config.IPVS.StrictARP,
            config.IPTables.MasqueradeAll,
            int(*config.IPTables.MasqueradeBit),
            config.ClusterCIDR,
            hostname,
            nodeIP,
            recorder,
            healthzServer,
            config.IPVS.Scheduler,
            config.NodePortAddresses,
        )
        ...
    } else {
        klog.V(0).Info("Using userspace Proxier.")

        // TODO this has side effects that should only happen when Run() is invoked.
        proxier, err = userspace.NewProxier(
            userspace.NewLoadBalancerRR(),
            net.ParseIP(config.BindAddress),
            iptInterface,
            execer,
            *utilnet.ParsePortRangeOrDie(config.PortRange),
            config.IPTables.SyncPeriod.Duration,
            config.IPTables.MinSyncPeriod.Duration,
            config.UDPIdleTimeout.Duration,
            config.NodePortAddresses,
        )
        ...
    }
    return &ProxyServer{
        Client:                 client,
        EventClient:            eventClient,
        IptInterface:           iptInterface,
        IpvsInterface:          ipvsInterface,
        IpsetInterface:         ipsetInterface,
        execer:                 execer,
        Proxier:                proxier,
        Broadcaster:            eventBroadcaster,
        Recorder:               recorder,
        ConntrackConfiguration: config.Conntrack,
        Conntracker:            &realConntracker{},
        ProxyMode:              proxyMode,
        NodeRef:                nodeRef,
        MetricsBindAddress:     config.MetricsBindAddress,
        EnableProfiling:        config.EnableProfiling,
        OOMScoreAdj:            config.OOMScoreAdj,
        ConfigSyncPeriod:       config.ConfigSyncPeriod.Duration,
        HealthzServer:          healthzServer,
        UseEndpointSlices:      utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice),
    }, nil
}

NewProxyServer 提供了三种运行模式,iptables、IPVS和 userspace,userspace 代理模式算比较旧的一种模式,在 Kubernetes v1.0 中开始使用 user space, v1.2的时候默认模式已经改为 iptables 了,现在大部分集群中都是这种模式。

userspace 模式其实就是直接通过kube-proxy 将数据包转发到后端 Pods,kube-proxy 在这里起到了路由规则下发、包转发规则、负载均衡的功能,由于 kube-proxy 是运行在用户空间的,会存在用户空间和内核空间的频繁切换,这对性能影响很大,所以后面默认就换成 iptables 了。 iptables 基于 netfilter 实现,所有操作都在内核空间相比基于 kube-proxy 直接做转发和负载均衡在性能上得到很大的提升。这里 kube-proxy 只是起到设置 iptables 的规则作用。 另一个是 IPVS, IPVS 在性能上比 iptables 更进一步,底层和 iptables 一样是基于 netfilter ,但IPVS 基于hash tabels来存储网络转发规则相比于 iptables 这种线性 O(n) 的算法要快很多。

但 1.17 版下 iptables 还是 kube-proxy 的默认选项,应该用得人也是最多得,这里就只介绍 iptables 的转发方式。

kube-proxy核心运行逻辑

okay,了解完 ProxyServer 结构,我们继续看看 kube-proxy 核心运行逻辑,也就是 Run()方法:

代码语言:javascript
复制
// Run runs the specified ProxyServer.  This should never exit (unless CleanupAndExit is set).
// TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors.
func (s *ProxyServer) Run() error {
    // 监控检查
    // Start up a healthz server if requested
    if s.HealthzServer != nil {
        s.HealthzServer.Run()
    }
    // metrics 指标上报
    // Start up a metrics server if requested
    if len(s.MetricsBindAddress) > 0 {
        proxyMux := mux.NewPathRecorderMux("kube-proxy")
        healthz.InstallHandler(proxyMux)
        proxyMux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
            w.Header().Set("Content-Type", "text/plain; charset=utf-8")
            w.Header().Set("X-Content-Type-Options", "nosniff")
            fmt.Fprintf(w, "%s", s.ProxyMode)
        })
        proxyMux.Handle("/metrics", legacyregistry.Handler())
        ...
    }
    ...

    // 通过 client-go 的 informer 像 api-server 获取信息
    // Make informers that filter out objects that want a non-default service proxy.
    informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
        informers.WithTweakListOptions(func(options *metav1.ListOptions) {
            options.LabelSelector = labelSelector.String()
        }))

    // Create configs (i.e. Watches for Services and Endpoints or EndpointSlices)
    // Note: RegisterHandler() calls need to happen before creation of Sources because sources
    // only notify on changes, and the initial update (on process start) may be lost if no handlers
    // are registered yet.
    // 创建 services
    serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
    serviceConfig.RegisterEventHandler(s.Proxier)
    // serviceconfig 执行
    go serviceConfig.Run(wait.NeverStop)
    // 创建 endpoints 或 endpointSlice
    if s.UseEndpointSlices {
        endpointSliceConfig := config.NewEndpointSliceConfig(informerFactory.Discovery().V1beta1().EndpointSlices(), s.ConfigSyncPeriod)
        endpointSliceConfig.RegisterEventHandler(s.Proxier)
        go endpointSliceConfig.Run(wait.NeverStop)
    } else {
        endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
        endpointsConfig.RegisterEventHandler(s.Proxier)
        go endpointsConfig.Run(wait.NeverStop)
    }
    informerFactory.Start(wait.NeverStop)
    ...

    // Just loop forever for now...
    s.Proxier.SyncLoop()
    return nil
}

从这里可以看到, ProxyServer 主要包括几步:

  • 监控检查
  • metrics数据上报
  • 通过 client-go 从apiserver 获取 services 和 endpoints/endpointSlice 配置
  • 创建 services 和 endpoints/endpointSlice
  • 进入循环

kube-proxy在 iptables 实现

在了解 ProxyServer 结构和运行原理之后,我们来看看 kube-proxy 是如何通过 iptables 创建 services 和 endpoints 的。

NewServiceConfig和NewEndpointSliceConfig

ServiceConfig 和 EndpointsConfig 是kube-proxy中用于监听service变化的组件,核心是三个方法AddFuncUpdateFuncDeleteFunc

代码语言:javascript
复制
// NewServiceConfig creates a new ServiceConfig.
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
    result := &ServiceConfig{
        listerSynced: serviceInformer.Informer().HasSynced,
    }

    serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    result.handleAddService,
            UpdateFunc: result.handleUpdateService,
            DeleteFunc: result.handleDeleteService,
        },
        resyncPeriod,
    )

    return result
}

// NewEndpointsConfig creates a new EndpointsConfig.
func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig {
    result := &EndpointsConfig{
        listerSynced: endpointsInformer.Informer().HasSynced,
    }

    endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    result.handleAddEndpoints,
            UpdateFunc: result.handleUpdateEndpoints,
            DeleteFunc: result.handleDeleteEndpoints,
        },
        resyncPeriod,
    )

    return result
}

以 service 为例(endpoints和services类似)三个方法分别对应着OnServiceAddOnServiceUpdateOnServiceDelete,这三个都是接口方法。

代码语言:javascript
复制
func (c *ServiceConfig) handleAddService(obj interface{}) {
    service, ok := obj.(*v1.Service)
    ...
    for i := range c.eventHandlers {
        klog.V(4).Info("Calling handler.OnServiceAdd")
        c.eventHandlers[i].OnServiceAdd(service)
    }
}

func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
    oldService, ok := oldObj.(*v1.Service)
    service, ok := newObj.(*v1.Service)
    ...
    for i := range c.eventHandlers {
        klog.V(4).Info("Calling handler.OnServiceUpdate")
        c.eventHandlers[i].OnServiceUpdate(oldService, service)
    }
}

func (c *ServiceConfig) handleDeleteService(obj interface{}) {
    service, ok := obj.(*v1.Service)
    ...
    for i := range c.eventHandlers {
        klog.V(4).Info("Calling handler.OnServiceDelete")
        c.eventHandlers[i].OnServiceDelete(service)
    }
}

这些接口的具体实现我们可以找到对应的实现,这里以 iptables 为例,因为找到 pkg/proxy/iptables/proxier.go 下面 Proxier 对应方法实现:

代码语言:javascript
复制
// OnServiceAdd is called whenever creation of new service object
// is observed.
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
    proxier.OnServiceUpdate(nil, service)
}
// OnServiceDelete is called whenever deletion of an existing service
// object is observed.
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
    proxier.OnServiceUpdate(service, nil)

}
// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
    if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
        proxier.Sync()
    }
}

可以看到这增、删、改三个方法都是 Update实现的

serviceConfig.Run和endpointsConfig.Run

serviceConfig.Run 方法:

代码语言:javascript
复制
// RegisterEventHandler registers a handler which is called on every service change.
func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
    c.eventHandlers = append(c.eventHandlers, handler)
}

// Run waits for cache synced and invokes handlers after syncing.
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
    klog.Info("Starting service config controller")

    if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) {
        return
    }

    for i := range c.eventHandlers {
        klog.V(3).Info("Calling handler.OnServiceSynced()")
        c.eventHandlers[i].OnServiceSynced()
    }
}

这里的 eventHandler 就是一个包含 Proxier 的数组,这里核心是OnServiceSynced方法,找到具体iptables proxier 的这个方法的具体实现:

代码语言:javascript
复制
// OnServiceSynced is called once all the initial even handlers were
// called and the state is fully propagated to local cache.
func (proxier *Proxier) OnServiceSynced() {
    proxier.mu.Lock()
    proxier.servicesSynced = true
    if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
        proxier.setInitialized(proxier.endpointSlicesSynced)
    } else {
        proxier.setInitialized(proxier.endpointsSynced)
    }
    proxier.mu.Unlock()

    // Sync unconditionally - this is called once per lifetime.
    proxier.syncProxyRules()
}

// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// This assumes proxier.mu is NOT held
func (proxier *Proxier) syncProxyRules() {
    ...
}

kube-proxy 在 iptables 规则

到了核心的 syncProxyRules() 方法了,我们先看关键 iptables 的 chain 创建:

代码语言:javascript
复制
func (proxier *Proxier) syncProxyRules() {
  ...
  // Create and link the kube chains.
  // 创建链表
	for _, jump := range iptablesJumpChains {
		if _, err := proxier.iptables.EnsureChain(jump.table, jump.dstChain); err != nil {
			klog.Errorf("Failed to ensure that %s chain %s exists: %v", jump.table, jump.dstChain, err)
			return
		}
		args := append(jump.extraArgs,
			"-m", "comment", "--comment", jump.comment,
			"-j", string(jump.dstChain),
		)
		if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jump.table, jump.srcChain, args...); err != nil {
			klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jump.table, jump.srcChain, jump.dstChain, err)
			return
		}
  }
  ...
	// Write table headers.
	writeLine(proxier.filterChains, "*filter")
	writeLine(proxier.natChains, "*nat")

	// Make sure we keep stats for the top-level chains, if they existed
  // (which most should have because we created them above).
  // 会获取所有存在的chains rule,然后将新的 chains rule 加入到最前面
	for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} {
		if chain, ok := existingFilterChains[chainName]; ok {
			writeBytesLine(proxier.filterChains, chain)
		} else {
			writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName))
		}
	}
	for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} {
		if chain, ok := existingNATChains[chainName]; ok {
			writeBytesLine(proxier.natChains, chain)
		} else {
			writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName))
		}
  }
  ...
}

var iptablesJumpChains = []iptablesJumpChain{
	{utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
	{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
	{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
	{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainInput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
	{utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil},
	{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil},
	{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil},
	{utiliptables.TableNAT, kubePostroutingChain, utiliptables.ChainPostrouting, "kubernetes postrouting rules", nil},
}

const (
	// the services chain
	kubeServicesChain utiliptables.Chain = "KUBE-SERVICES"
	// the external services chain
	kubeExternalServicesChain utiliptables.Chain = "KUBE-EXTERNAL-SERVICES"
	// the nodeports chain
	kubeNodePortsChain utiliptables.Chain = "KUBE-NODEPORTS"
	// the kubernetes postrouting chain
	kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING"
	// KubeMarkMasqChain is the mark-for-masquerade chain
	KubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
	// KubeMarkDropChain is the mark-for-drop chain
	KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP"
	// the kubernetes forward chain
	kubeForwardChain utiliptables.Chain = "KUBE-FORWARD"
)

可以看到 kube-proxy 的 chain 都是在 filter 表和 nat 表下创建的。

看 nat 表:

代码语言:javascript
复制
...
-A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES

...
-A KUBE-SERVICES -d 10.247.91.74/32 -p tcp -m comment --comment "rcmd/playmate-rank:grpc cluster IP" -m tcp --dport 8000 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -d 10.247.91.74/32 -p tcp -m comment --comment "rcmd/playmate-rank:grpc cluster IP" -m tcp --dport 8000 -j KUBE-SVC-YTWGRZ3E3MPBXGU3

...
-A KUBE-SVC-YTWGRZ3E3MPBXGU3 -j KUBE-SEP-EVJ6H5FW5OUSCV2Y

...
-A KUBE-SEP-EVJ6H5FW5OUSCV2Y -s 10.0.2.250/32 -j KUBE-MARK-MASQ
-A KUBE-SEP-EVJ6H5FW5OUSCV2Y -p tcp -m tcp -j DNAT --to-destination 10.0.2.250:8000

对于LB的路由路径:

代码语言:javascript
复制
...
-A KUBE-SERVICES -d 10.247.248.210/32 -p tcp -m comment --comment "istio-system/istio-ingressgateway-external:rcmd cluster IP" -m tcp --dport 8000 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -d 10.247.248.210/32 -p tcp -m comment --comment "istio-system/istio-ingressgateway-external:rcmd cluster IP" -m tcp --dport 8000 -j KUBE-SVC-TBZWFMENS353FQVB
-A KUBE-SERVICES -d <公网ip>/32 -p tcp -m comment --comment "istio-system/istio-ingressgateway-external:rcmd loadbalancer IP" -m tcp --dport 8000 -j KUBE-FW-TBZWFMENS353FQVB

...
-A KUBE-FW-TBZWFMENS353FQVB -m comment --comment "istio-system/istio-ingressgateway-external:rcmd loadbalancer IP" -j KUBE-MARK-MASQ
-A KUBE-FW-TBZWFMENS353FQVB -m comment --comment "istio-system/istio-ingressgateway-external:rcmd loadbalancer IP" -j KUBE-SVC-TBZWFMENS353FQVB
-A KUBE-FW-TBZWFMENS353FQVB -m comment --comment "istio-system/istio-ingressgateway-external:rcmd loadbalancer IP" -j KUBE-MARK-DROP

对于 nodeport 的形式:

代码语言:javascript
复制
...
-A KUBE-SERVICES -d <节点ip>/32 -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -j KUBE-NODEPORTS
...
-A KUBE-NODEPORTS -p tcp -m comment --comment "istio-system/istio-ingressgateway-external:rcmd" -m tcp --dport <目标端口> -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -p tcp -m comment --comment "istio-system/istio-ingressgateway-external:rcmd" -m tcp --dport <目标端口> -j KUBE-SVC-TBZWFMENS353FQVB

这里的 KUBE-SVC-YTWGRZ3E3MPBXGU3KUBE-SEP-EVJ6H5FW5OUSCV2YKUBE-FW-TBZWFMENS353FQVB 这些是通过对 servicePortName 和 协议生成:

代码语言:javascript
复制
// servicePortChainName takes the ServicePortName for a service and
// returns the associated iptables chain.  This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-SVC-".
func servicePortChainName(servicePortName string, protocol string) utiliptables.Chain {
	return utiliptables.Chain("KUBE-SVC-" + portProtoHash(servicePortName, protocol))
}

// serviceFirewallChainName takes the ServicePortName for a service and
// returns the associated iptables chain.  This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-FW-".
func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain {
	return utiliptables.Chain("KUBE-FW-" + portProtoHash(servicePortName, protocol))
}

// serviceLBPortChainName takes the ServicePortName for a service and
// returns the associated iptables chain.  This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-XLB-".  We do
// this because IPTables Chain Names must be <= 28 chars long, and the longer
// they are the harder they are to read.
func serviceLBChainName(servicePortName string, protocol string) utiliptables.Chain {
	return utiliptables.Chain("KUBE-XLB-" + portProtoHash(servicePortName, protocol))
}

func portProtoHash(servicePortName string, protocol string) string {
	hash := sha256.Sum256([]byte(servicePortName + protocol))
	encoded := base32.StdEncoding.EncodeToString(hash[:])
	return encoded[:16]
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-01-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • services 和 endpoints
  • 从 kube-proxy 看 services 工作机制
    • ProxyServer 三种运行模式
      • kube-proxy核心运行逻辑
        • kube-proxy在 iptables 实现
          • NewServiceConfig和NewEndpointSliceConfig
          • serviceConfig.Run和endpointsConfig.Run
        • kube-proxy 在 iptables 规则
        相关产品与服务
        容器服务
        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档