kubernetes之kube-proxy工作原理和源码分析

本文对kube-proxy做了一些总结说明,对其内部的实现原理进行了研究,并对userspace和iptables两种mode的缺点进行的描述,都通过例子说明了iptable的工作。在下一篇博文中,我将对k8s v1.5中kube-proxy的源码进行分析,有兴趣的同学可以关注。

kube-proxy和service背景

说到kube-proxy,就不得不提到k8s中service,下面对它们两做简单说明: kube-proxy其实就是管理service的访问入口,包括集群内Pod到Service的访问和集群外访问service。 kube-proxy管理sevice的Endpoints,该service对外暴露一个Virtual IP,也成为Cluster IP, 集群内通过访问这个Cluster IP:Port就能访问到集群内对应的serivce下的Pod。 service是通过Selector选择的一组Pods的服务抽象,其实就是一个微服务,提供了服务的LB和反向代理的能力,而kube-proxy的主要作用就是负责service的实现。 service另外一个重要作用是,一个服务后端的Pods可能会随着生存灭亡而发生IP的改变,service的出现,给服务提供了一个固定的IP,而无视后端Endpoint的变化。

服务发现

k8s提供了两种方式进行服务发现: 环境变量: 当你创建一个Pod的时候,kubelet会在该Pod中注入集群内所有Service的相关环境变量。需要注意的是,要想一个Pod中注入某个Service的环境变量,则必须S这个ervice必须先于该Pod的创建。这一点,几乎使得这种方式进行服务发现不可用。

比如,一个ServiceName为redis-master的Service,对应的ClusterIP:Port为10.0.0.11:6379,则其对应的环境变量为:

REDIS_MASTER_SERVICE_HOST=10.0.0.11
REDIS_MASTER_SERVICE_PORT=6379
REDIS_MASTER_PORT=tcp://10.0.0.11:6379
REDIS_MASTER_PORT_6379_TCP=tcp://10.0.0.11:6379
REDIS_MASTER_PORT_6379_TCP_PROTO=tcp
REDIS_MASTER_PORT_6379_TCP_PORT=6379
REDIS_MASTER_PORT_6379_TCP_ADDR=10.0.0.11

DNS:这也是k8s官方强烈推荐的方式。

可以通过cluster add-on的方式轻松的创建KubeDNS来对集群内的Service进行服务发现。

更多关于KubeDNS的内容,请查看: Kubernetes DNS Service技术研究。

发布(暴露)服务

k8s原生的,一个Service的ServiceType决定了其发布服务的方式。

1.ClusterIP:这是k8s默认的ServiceType。通过集群内的ClusterIP在内部发布服务。

2.NodePort:这种方式是常用的,用来对集群外暴露Service,你可以通过访问集群内的每个NodeIP:NodePort的方式,访问到对应Service后端的Endpoint。

3.LoadBalancer: 这也是用来对集群外暴露服务的,不同的是这需要Cloud Provider的支持,比如AWS等。 ExternalName:这个也是在集群内发布服务用的,需要借助KubeDNS(version >= 1.7)的支持,就是用KubeDNS将该service和ExternalName做一个Map,KubeDNS返回一个CNAME记录。

kube-proxy内部原理

kube-proxy当前实现了两种proxyMode:userspace和iptables。其中userspace mode是v1.0及之前版本的默认模式,从v1.1版本中开始增加了iptables mode,在v1.2版本中正式替代userspace模式成为默认模式。

kube-proxy转发的两种模式

kube-proxy在转发时主要有两种模式Userspace和Iptables。

使用Userspace模式(k8s版本为1.2之前默认模式),外部网络可以直接访问cluster IP。 使用Iptables模式(k8s版本为1.2之后默认模式),外部网络不能直接访问cluster IP。 从效率上看,Iptables会更高一些,但是需要Iptables version >=1.4.11. userspace mode

userspace是在用户空间,通过kube-proxy来实现service的代理服务。废话不多说,其原理如下如图所示: 输入图片说明

可见,这种mode最大的问题是,service的请求会先从用户空间进入内核iptables,然后再回到用户空间,由kube-proxy完成后端Endpoints的选择和代理工作,这样流量从用户空间进出内核带来的性能损耗是不可接受的。这也是k8s v1.0及之前版本中对kube-proxy质疑最大的一点,因此社区就开始研究iptables mode。 Example

$ kubectl get service
NAME             LABELS                                    SELECTOR              IP(S)            PORT(S)
kubernetes       component=apiserver,provider=kubernetes                   10.254.0.1       443/TCP
ssh-service1     name=ssh,role=service                     ssh-service=true      10.254.132.107   2222/TCP

$ kubectl describe service ssh-service1 
Name:           ssh-service1
Namespace:      default
Labels:         name=ssh,role=service
Selector:       ssh-service=true
Type:           LoadBalancer
IP:         10.254.132.107
Port:              2222/TCP
NodePort:          30239/TCP
Endpoints:      
Session Affinity:   None
No events.

NodePort的工作原理与ClusterIP大致相同,发送到某个NodeIP:NodePort的请求,通过iptables重定向到kube-proxy对应的端口(Node上的随机端口)上,然后由kube-proxy再将请求发送到其中的一个Pod:TargetPort。

这里,假如Node的ip为10.0.0.5,则对应的iptables如下:

$ sudo iptables -S -t nat
...
-A KUBE-NODEPORT-CONTAINER -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 30239 -j REDIRECT --to-ports 36463
-A KUBE-NODEPORT-HOST -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 30239 -j DNAT --to-destination 10.0.0.5:36463
-A KUBE-PORTALS-CONTAINER -d 10.254.132.107/32 -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 2222 -j REDIRECT --to-ports 36463
-A KUBE-PORTALS-HOST -d 10.254.132.107/32 -p tcp -m comment --comment "default/ssh-service1:" -m tcp --dport 2222 -j DNAT --to-destination 10.0.0.5:36463

可见:访问10.0.0.5:30239端口会被转发到node上的36463端口(随机监听端口)。而且在访问clusterIP 10.254.132.107的2222端口时,也会把请求转发到本地的36463端口。 36463端口实际被kube-proxy所监听,将流量进行导向到后端的pod上。 iptables mode

另一种mode是iptables,它完全利用内核iptables来实现service的代理和LB。是v1.2及之后版本默认模式,其原理图如下所示: 输入图片说明

iptables mode因为使用iptable NAT来完成转发,也存在不可忽视的性能损耗。另外,如果集群中存在上万的Service/Endpoint,那么Node上的iptables rules将会非常庞大,性能还会再打折扣。

这也导致,目前大部分企业用k8s上生产时,都不会直接用kube-proxy作为服务代理,而是通过自己开发或者通过Ingress Controller来集成HAProxy, Nginx来代替kube-proxy。 Example

iptables的方式则是利用了linux的iptables的nat转发进行实现。

apiVersion: v1
kind: Service
metadata:
  labels:
    name: mysql
    role: service
  name: mysql-service
spec:
  ports:
    - port: 3306
      targetPort: 3306
      nodePort: 30964
  type: NodePort
  selector:
    mysql-service: "true"

mysql-service对应的nodePort暴露出来的端口为30964,对应的cluster IP(10.254.162.44)的端口为3306,进一步对应于后端的pod的端口为3306。

mysql-service后端代理了两个pod,ip分别是192.168.125.129和192.168.125.131。先来看一下iptables。

$iptables -S -t nat
...
-A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-SVC-67RL4FN6JRUPOJYM
-A KUBE-SEP-ID6YWIT3F6WNZ47P -s 192.168.125.129/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ
-A KUBE-SEP-ID6YWIT3F6WNZ47P -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.129:3306
-A KUBE-SEP-IN2YML2VIFH5RO2T -s 192.168.125.131/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ
-A KUBE-SEP-IN2YML2VIFH5RO2T -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.131:3306
-A KUBE-SERVICES -d 10.254.162.44/32 -p tcp -m comment --comment "default/mysql-service: cluster IP" -m tcp --dport 3306 -j KUBE-SVC-67RL4FN6JRUPOJYM
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
-A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-ID6YWIT3F6WNZ47P
-A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -j KUBE-SEP-IN2YML2VIFH5RO2T

首先如果是通过node的30964端口访问,则会进入到以下链:

-A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -p tcp -m comment --comment "default/mysql-service:" -m tcp --dport 30964 -j KUBE-SVC-67RL4FN6JRUPOJYM

然后进一步跳转到KUBE-SVC-67RL4FN6JRUPOJYM的链:

-A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-ID6YWIT3F6WNZ47P
-A KUBE-SVC-67RL4FN6JRUPOJYM -m comment --comment "default/mysql-service:" -j KUBE-SEP-IN2YML2VIFH5RO2T

这里利用了iptables的–probability的特性,使连接有50%的概率进入到KUBE-SEP-ID6YWIT3F6WNZ47P链,50%的概率进入到KUBE-SEP-IN2YML2VIFH5RO2T链。

KUBE-SEP-ID6YWIT3F6WNZ47P的链的具体作用就是将请求通过DNAT发送到192.168.125.129的3306端口。

-A KUBE-SEP-ID6YWIT3F6WNZ47P -s 192.168.125.129/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ
-A KUBE-SEP-ID6YWIT3F6WNZ47P -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.129:3306

同理KUBE-SEP-IN2YML2VIFH5RO2T的作用是通过DNAT发送到192.168.125.131的3306端口。

-A KUBE-SEP-IN2YML2VIFH5RO2T -s 192.168.125.131/32 -m comment --comment "default/mysql-service:" -j KUBE-MARK-MASQ
-A KUBE-SEP-IN2YML2VIFH5RO2T -p tcp -m comment --comment "default/mysql-service:" -m tcp -j DNAT --to-destination 192.168.125.131:3306

分析完nodePort的工作方式,接下里说一下clusterIP的访问方式。 对于直接访问cluster IP(10.254.162.44)的3306端口会直接跳转到KUBE-SVC-67RL4FN6JRUPOJYM。

-A KUBE-SERVICES -d 10.254.162.44/32 -p tcp -m comment –comment “default/mysql-service: cluster IP” -m tcp –dport 3306 -j KUBE-SVC-67RL4FN6JRUPOJYM

接下来的跳转方式同NodePort方式。

proxy源码目录结构分析

cmd/kube-proxy //负责kube-proxy的创建,启动的入口 . ├── app │ ├── conntrack.go //linux kernel的nf_conntrack-sysctl的interface定义,更多关于conntracker的定义请看https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt │ ├── options │ │ └── options.go //kube-proxy的参数定义ProxyServerConfig及相关方法 │ ├── server.go //ProxyServer结构定义及其创建(NewProxyServerDefault)和运行(Run)的方法。 │ └── server_test.go └── proxy.go //kube-proxy的main方法

pkg/proxy . ├── OWNERS ├── config │ ├── api.go //给proxy配置Service和Endpoint的Reflectors和Cache.Store │ ├── api_test.go │ ├── config.go //定义ServiceUpdate,EndpointUpdate结构体以及ServiceConfigHandler,EndpointConfigHandler来处理Service和Endpoint的Update │ ├── config_test.go │ └── doc.go ├── doc.go ├── healthcheck //负责service listener和endpoint的health check,add/delete请求。 │ ├── api.go │ ├── doc.go │ ├── healthcheck.go │ ├── healthcheck_test.go │ ├── http.go │ ├── listener.go │ └── worker.go ├── iptables //proxy mode为iptables的实现 │ ├── proxier.go │ └── proxier_test.go ├── types.go ├── userspace //proxy mode为userspace的实现 │ ├── loadbalancer.go │ ├── port_allocator.go │ ├── port_allocator_test.go │ ├── proxier.go │ ├── proxier_test.go │ ├── proxysocket.go │ ├── rlimit.go │ ├── rlimit_windows.go │ ├── roundrobin.go │ ├── roundrobin_test.go │ └── udp_server.go └── winuserspace //windows OS时,proxy mode为userspace的实现 ├── loadbalancer.go ├── port_allocator.go ├── port_allocator_test.go ├── proxier.go ├── proxier_test.go ├── proxysocket.go ├── roundrobin.go ├── roundrobin_test.go └── udp_server.go

内部实现模块逻辑图

这里写图片描述 源码分析 main

kube-proxy的main入口在:cmd/kube-proxy/proxy.go:39

func main() { //创建kube-proxy的默认config对象 config := options.NewProxyConfig() //用kube-proxy命令行的参数替换默认参数 config.AddFlags(pflag.CommandLine)

flag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()

verflag.PrintAndExitIfRequested()

//根据config创建ProxyServer
s, err := app.NewProxyServerDefault(config)
if err != nil {
    fmt.Fprintf(os.Stderr, "%v\n", err)
    os.Exit(1)
}

//执行Run方法让kube-proxy开始干活了
if err = s.Run(); err != nil {
    fmt.Fprintf(os.Stderr, "%v\n", err)
    os.Exit(1)
}

}

main方法中,我们重点关注app.NewProxyServerDefault(config)创建ProxyServer和Run方法。 创建ProxyServer

NewProxyServerDefault负责根据提供的config参数创建一个新的ProxyServer对象,其代码比较长,逻辑相对复杂,下面会挑重点说一下。

cmd/kube-proxy/app/server.go:131

func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, error) { …

// Create a iptables utils.
execer := exec.New()

if runtime.GOOS == "windows" {
    netshInterface = utilnetsh.New(execer)
} else {
    dbus = utildbus.New()
    iptInterface = utiliptables.New(execer, dbus, protocol)
}

...
//设置OOM_SCORE_ADJ
var oomAdjuster *oom.OOMAdjuster
if config.OOMScoreAdj != nil {
    oomAdjuster = oom.NewOOMAdjuster()
    if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*config.OOMScoreAdj)); err != nil {
        glog.V(2).Info(err)
    }
}

...

// Create a Kube Client
...

// 创建event Broadcaster和event recorder
hostname := nodeutil.GetHostname(config.HostnameOverride)
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "kube-proxy", Host: hostname})

//定义proxier和endpointsHandler,分别用于处理services和endpoints的update event。
var proxier proxy.ProxyProvider
var endpointsHandler proxyconfig.EndpointsConfigHandler

//从config中获取proxy mode
proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})

// proxy mode为iptables场景
if proxyMode == proxyModeIPTables {
    glog.V(0).Info("Using iptables Proxier.")
    if config.IPTablesMasqueradeBit == nil {
        // IPTablesMasqueradeBit must be specified or defaulted.
        return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config")
    }

    //调用pkg/proxy/iptables/proxier.go:222中的iptables.NewProxier来创建proxier,赋值给前面定义的proxier和endpointsHandler,表示由该proxier同时负责service和endpoint的event处理。
    proxierIPTables, err := iptables.NewProxier(iptInterface, utilsysctl.New(), execer, config.IPTablesSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname))
    if err != nil {
        glog.Fatalf("Unable to create proxier: %v", err)
    }
    proxier = proxierIPTables
    endpointsHandler = proxierIPTables
    // No turning back. Remove artifacts that might still exist from the userspace Proxier.
    glog.V(0).Info("Tearing down userspace rules.")
    userspace.CleanupLeftovers(iptInterface)
} 
// proxy mode为userspace场景
else {
    glog.V(0).Info("Using userspace Proxier.")
    // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
    // our config.EndpointsConfigHandler.
    loadBalancer := userspace.NewLoadBalancerRR()
    // set EndpointsConfigHandler to our loadBalancer
    endpointsHandler = loadBalancer

    var proxierUserspace proxy.ProxyProvider

    // windows OS场景下,调用pkg/proxy/winuserspace/proxier.go:146的winuserspace.NewProxier来创建proxier。
    if runtime.GOOS == "windows" {
        proxierUserspace, err = winuserspace.NewProxier(
            loadBalancer,
            net.ParseIP(config.BindAddress),
            netshInterface,
            *utilnet.ParsePortRangeOrDie(config.PortRange),
            // TODO @pires replace below with default values, if applicable
            config.IPTablesSyncPeriod.Duration,
            config.UDPIdleTimeout.Duration,
        )
    } 

    // linux OS场景下,调用pkg/proxy/userspace/proxier.go:143的userspace.NewProxier来创建proxier。
    else {
        proxierUserspace, err = userspace.NewProxier(
            loadBalancer,
            net.ParseIP(config.BindAddress),
            iptInterface,
            *utilnet.ParsePortRangeOrDie(config.PortRange),
            config.IPTablesSyncPeriod.Duration,
            config.IPTablesMinSyncPeriod.Duration,
            config.UDPIdleTimeout.Duration,
        )
    }
    if err != nil {
        glog.Fatalf("Unable to create proxier: %v", err)
    }
    proxier = proxierUserspace
    // Remove artifacts from the pure-iptables Proxier, if not on Windows.
    if runtime.GOOS != "windows" {
        glog.V(0).Info("Tearing down pure-iptables proxy rules.")
        iptables.CleanupLeftovers(iptInterface)
    }
}

// Add iptables reload function, if not on Windows.
if runtime.GOOS != "windows" {
    iptInterface.AddReloadFunc(proxier.Sync)
}

// Create configs (i.e. Watches for Services and Endpoints)
// 创建serviceConfig负责service的watchforUpdates
serviceConfig := proxyconfig.NewServiceConfig()

//给serviceConfig注册proxier,既添加对应的listener用来处理service update时逻辑。
serviceConfig.RegisterHandler(proxier)

// 创建endpointsConfig负责endpoint的watchforUpdates
endpointsConfig := proxyconfig.NewEndpointsConfig()

//给endpointsConfig注册endpointsHandler,既添加对应的listener用来处理endpoint update时的逻辑。
endpointsConfig.RegisterHandler(endpointsHandler)

//NewSourceAPI creates config source that watches for changes to the services and endpoints.
//NewSourceAPI通过ListWatch apiserver的Service和endpoint,并周期性的维护serviceStore和endpointStore的更新
proxyconfig.NewSourceAPI(
    client.Core().RESTClient(),
    config.ConfigSyncPeriod,
    serviceConfig.Channel("api"), //Service Update Channel
    endpointsConfig.Channel("api"),  //endpoint update channel
)

...

//把前面创建的对象作为参数,构造出ProxyServer对象。
return NewProxyServer(client, config, iptInterface, proxier, eventBroadcaster, recorder, conntracker, proxyMode)

}

NewProxyServerDefault中的核心逻辑我都已经在上述代码中添加了注释,其中有几个地方需要我们再深入跟进去看看:proxyconfig.NewServiceConfig,proxyconfig.NewEndpointsConfig,serviceConfig.RegisterHandler,endpointsConfig.RegisterHandler,proxyconfig.NewSourceAPI。 proxyconfig.NewServiceConfig

我们对ServiceConfig的代码分析一遍,EndpointsConfig的代码则类似。

pkg/proxy/config/config.go:192 func NewServiceConfig() *ServiceConfig { // 创建updates channel updates := make(chan struct{}, 1)

// 构建serviceStore对象
store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)}
mux := config.NewMux(store)

// 新建Broadcaster,在后续的serviceConfig.RegisterHandler会注册该Broadcaster的listener。
bcaster := config.NewBroadcaster()

//启动协程,马上开始watch updates channel
go watchForUpdates(bcaster, store, updates)

return &ServiceConfig{mux, bcaster, store}

}

下面我们再跟进watchForUpdates去看看。

pkg/proxy/config/config.go:292 func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) { for true { <-updates bcaster.Notify(accessor.MergedState()) } }

watchForUpdates就是一直在watch updates channel,如果有数据,则立刻由该Broadcaster Notify到注册的listeners。 Notify的代码如下,可见,它负责将数据通知给所有的listener,并调用各个listener的OnUpdate方法。

pkg/util/config/config.go:133 // Notify notifies all listeners. func (b *Broadcaster) Notify(instance interface{}) { b.listenerLock.RLock() listeners := b.listeners b.listenerLock.RUnlock() for _, listener := range listeners { listener.OnUpdate(instance) } }

func (f ListenerFunc) OnUpdate(instance interface{}) { f(instance) }

serviceConfig.RegisterHandler

上面分析的proxyconfig.NewServiceConfig负责创建ServiceConfig,开始watch updates channel了,当从channel中取到值的时候,Broadcaster就会通知listener进行处理。serviceConfig.RegisterHandler正是负责给Broadcaster注册listener的,其代码如下。

pkg/proxy/config/config.go:205

func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { //给ServiceConfig的Broadcaster注册listener。 c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { glog.V(3).Infof(“Calling handler.OnServiceUpdate()”) handler.OnServiceUpdate(instance.([]api.Service)) })) }

上面分析proxyconfig.NewServiceConfig时可知,当从updates channel中取到值的时候,最终会调用对应的ListenerFunc(instance)进行处理,在这里,也就是调用:

    glog.V(3).Infof("Calling handler.OnServiceUpdate()")
    handler.OnServiceUpdate(instance.([]api.Service))
}

即调用到handler.OnServiceUpdate。每种proxymode对应的proxier都有对应的handler.OnServiceUpdate接口实现,我们以iptables mode为例,看看handler.OnServiceUpdate的实现:

pkg/proxy/iptables/proxier.go:428 func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { …

proxier.syncProxyRules()
proxier.deleteServiceConnections(staleUDPServices.List())

}

因此,最终关键的逻辑都转向了proxier.syncProxyRules(),从我们上面给出的内部模块交互图也能看得出来。对于proxier.syncProxyRules(),我们放到后面来详细讨论,现在你只要知道proxier.syncProxyRules()负责将proxy中缓存的service/endpoint同步更新到iptables中生成对应Chain和NAT Rules。 proxyconfig.NewEndpointsConfig

endpointsConfig的逻辑和serviceConfig的类似,在这里只给出对应代码,不再跟进分析。

pkg/proxy/config/config.go:84

func NewEndpointsConfig() *EndpointsConfig { // The updates channel is used to send interrupts to the Endpoints handler. // It’s buffered because we never want to block for as long as there is a // pending interrupt, but don’t want to drop them if the handler is doing // work. updates := make(chan struct{}, 1) store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)} mux := config.NewMux(store) bcaster := config.NewBroadcaster() go watchForUpdates(bcaster, store, updates) return &EndpointsConfig{mux, bcaster, store} }

endpointsConfig.RegisterHandler

pkg/proxy/config/config.go:97

func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { glog.V(3).Infof(“Calling handler.OnEndpointsUpdate()”) handler.OnEndpointsUpdate(instance.([]api.Endpoints)) })) }

proxyconfig.NewSourceAPI

proxyconfig.NewSourceAPI是很关键的,它负责给service updates channel和endpoint updates channel配置数据源,它是通过周期性的List和Watch kube-apiserver中的all service and endpoint来提供数据的,发给对应的channel。默认的List周期是15min,可通过–config-sync-period修改。下面来看其具体代码:

func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { servicesLW := cache.NewListWatchFromClient(c, “services”, api.NamespaceAll, fields.Everything()) cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run()

endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything())
cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run()

}

// NewServiceStore creates an undelta store that expands updates to the store into // ServiceUpdate events on the channel. If no store is passed, a default store will // be initialized. Allows reuse of a cache store across multiple components. func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store { fn := func(objs []interface{}) { var services []api.Service for _, o := range objs { services = append(services, *(o.(*api.Service))) } ch <- ServiceUpdate{Op: SET, Services: services} } if store == nil { store = cache.NewStore(cache.MetaNamespaceKeyFunc) } return &cache.UndeltaStore{ Store: store, PushFunc: fn, } }

// NewEndpointsStore creates an undelta store that expands updates to the store into // EndpointsUpdate events on the channel. If no store is passed, a default store will // be initialized. Allows reuse of a cache store across multiple components. func NewEndpointsStore(store cache.Store, ch chan<- EndpointsUpdate) cache.Store { fn := func(objs []interface{}) { var endpoints []api.Endpoints for _, o := range objs { endpoints = append(endpoints, *(o.(*api.Endpoints))) } ch <- EndpointsUpdate{Op: SET, Endpoints: endpoints} } if store == nil { store = cache.NewStore(cache.MetaNamespaceKeyFunc) } return &cache.UndeltaStore{ Store: store, PushFunc: fn, } }

代码很简单,不需要过多解释。 执行Run开始工作

创建完ProxyServer后,就执行Run方法开始工作了,它主要负责周期性(default 30s)的同步proxy中的services/endpionts到iptables中生成对应Chain and NAT Rules。

cmd/kube-proxy/app/server.go:308 func (s *ProxyServer) Run() error { …

// Start up a webserver if requested
if s.Config.HealthzPort > 0 {
    http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
        fmt.Fprintf(w, "%s", s.ProxyMode)
    })
    configz.InstallHandler(http.DefaultServeMux)
    go wait.Until(func() {
        err := http.ListenAndServe(s.Config.HealthzBindAddress+":"+strconv.Itoa(int(s.Config.HealthzPort)), nil)
        if err != nil {
            glog.Errorf("Starting health server failed: %v", err)
        }
    }, 5*time.Second, wait.NeverStop)
}

...

// Just loop forever for now...
s.Proxier.SyncLoop()
return nil

}

Run方法关键代码很简单,就是执行对应proxier的SyncLoop()。我们还是以iptables mode为例,看看它是如何实现SyncLoop()的:

pkg/proxy/iptables/proxier.go:416 // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (proxier *Proxier) SyncLoop() { t := time.NewTicker(proxier.syncPeriod) defer t.Stop() for { <-t.C glog.V(6).Infof(“Periodic sync”) proxier.Sync() } }

SyncLoop中,通过设置定时器,默认每30s会执行一次proxier.Sync(),可以通过–iptables-sync-period修改默认时间。那我们继续跟进Sync()的代码:

pkg/proxy/iptables/proxier.go:409 // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { proxier.mu.Lock() defer proxier.mu.Unlock() proxier.syncProxyRules() }

可见,最终还是调用proxier.syncProxyRules()。前一节中创建ProxyServer的分析也是一样,最终watch到service/endpoint有更新时,都会调用到proxier.syncProxyRules()。那下面我们就来看看proxier.syncProxyRules()的代码。 proxier.syncProxyRules

下面的proxier.syncProxyRules代码是iptables mode对应的实现。userspace mode的代码我就不贴了。

pkg/proxy/iptables/proxier.go:791 // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held func (proxier *Proxier) syncProxyRules() { if proxier.throttle != nil { proxier.throttle.Accept() } start := time.Now() defer func() { glog.V(4).Infof(“syncProxyRules took %v”, time.Since(start)) }() // don’t sync rules till we’ve received services and endpoints if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { glog.V(2).Info(“Not syncing iptables until Services and Endpoints have been received from master”) return } glog.V(3).Infof(“Syncing iptables rules”)

// Create and link the kube services chain.
{
    tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT}
    for _, table := range tablesNeedServicesChain {
        if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil {
            glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err)
            return
        }
    }

    tableChainsNeedJumpServices := []struct {
        table utiliptables.Table
        chain utiliptables.Chain
    }{
        {utiliptables.TableFilter, utiliptables.ChainOutput},
        {utiliptables.TableNAT, utiliptables.ChainOutput},
        {utiliptables.TableNAT, utiliptables.ChainPrerouting},
    }
    comment := "kubernetes service portals"
    args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)}
    for _, tc := range tableChainsNeedJumpServices {
        if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil {
            glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err)
            return
        }
    }
}

// Create and link the kube postrouting chain.
{
    if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil {
        glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err)
        return
    }

    comment := "kubernetes postrouting rules"
    args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)}
    if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil {
        glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, err)
        return
    }
}

// Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingFilterChains := make(map[utiliptables.Chain]string)
iptablesSaveRaw, err := proxier.iptables.Save(utiliptables.TableFilter)
if err != nil { // if we failed to get any rules
    glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
    existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, iptablesSaveRaw)
}

existingNATChains := make(map[utiliptables.Chain]string)
iptablesSaveRaw, err = proxier.iptables.Save(utiliptables.TableNAT)
if err != nil { // if we failed to get any rules
    glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
    existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw)
}

filterChains := bytes.NewBuffer(nil)
filterRules := bytes.NewBuffer(nil)
natChains := bytes.NewBuffer(nil)
natRules := bytes.NewBuffer(nil)

// Write table headers.
writeLine(filterChains, "*filter")
writeLine(natChains, "*nat")

// Make sure we keep stats for the top-level chains, if they existed
// (which most should have because we created them above).
if chain, ok := existingFilterChains[kubeServicesChain]; ok {
    writeLine(filterChains, chain)
} else {
    writeLine(filterChains, utiliptables.MakeChainLine(kubeServicesChain))
}
if chain, ok := existingNATChains[kubeServicesChain]; ok {
    writeLine(natChains, chain)
} else {
    writeLine(natChains, utiliptables.MakeChainLine(kubeServicesChain))
}
if chain, ok := existingNATChains[kubeNodePortsChain]; ok {
    writeLine(natChains, chain)
} else {
    writeLine(natChains, utiliptables.MakeChainLine(kubeNodePortsChain))
}
if chain, ok := existingNATChains[kubePostroutingChain]; ok {
    writeLine(natChains, chain)
} else {
    writeLine(natChains, utiliptables.MakeChainLine(kubePostroutingChain))
}
if chain, ok := existingNATChains[KubeMarkMasqChain]; ok {
    writeLine(natChains, chain)
} else {
    writeLine(natChains, utiliptables.MakeChainLine(KubeMarkMasqChain))
}

// Install the kubernetes-specific postrouting rules. We use a whole chain for
// this so that it is easier to flush and change, for example if the mark
// value should ever change.
writeLine(natRules, []string{
    "-A", string(kubePostroutingChain),
    "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
    "-m", "mark", "--mark", proxier.masqueradeMark,
    "-j", "MASQUERADE",
}...)

// Install the kubernetes-specific masquerade mark rule. We use a whole chain for
// this so that it is easier to flush and change, for example if the mark
// value should ever change.
writeLine(natRules, []string{
    "-A", string(KubeMarkMasqChain),
    "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
}...)

// Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set

// Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[localPort]closeable{}

// Build rules for each service.
for svcName, svcInfo := range proxier.serviceMap {
    protocol := strings.ToLower(string(svcInfo.protocol))

    // Create the per-service chain, retaining counters if possible.
    svcChain := servicePortChainName(svcName, protocol)
    if chain, ok := existingNATChains[svcChain]; ok {
        writeLine(natChains, chain)
    } else {
        writeLine(natChains, utiliptables.MakeChainLine(svcChain))
    }
    activeNATChains[svcChain] = true

    svcXlbChain := serviceLBChainName(svcName, protocol)
    if svcInfo.onlyNodeLocalEndpoints {
        // Only for services with the externalTraffic annotation set to OnlyLocal
        // create the per-service LB chain, retaining counters if possible.
        if lbChain, ok := existingNATChains[svcXlbChain]; ok {
            writeLine(natChains, lbChain)
        } else {
            writeLine(natChains, utiliptables.MakeChainLine(svcXlbChain))
        }
        activeNATChains[svcXlbChain] = true
    } else if activeNATChains[svcXlbChain] {
        // Cleanup the previously created XLB chain for this service
        delete(activeNATChains, svcXlbChain)
    }

    // Capture the clusterIP.
    args := []string{
        "-A", string(kubeServicesChain),
        "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcName.String()),
        "-m", protocol, "-p", protocol,
        "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
        "--dport", fmt.Sprintf("%d", svcInfo.port),
    }
    if proxier.masqueradeAll {
        writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
    }
    if len(proxier.clusterCIDR) > 0 {
        writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
    }
    writeLine(natRules, append(args, "-j", string(svcChain))...)

    // Capture externalIPs.
    for _, externalIP := range svcInfo.externalIPs {
        // If the "external" IP happens to be an IP that is local to this
        // machine, hold the local port open so no other process can open it
        // (because the socket might open but it would never work).
        if local, err := isLocalIP(externalIP); err != nil {
            glog.Errorf("can't determine if IP is local, assuming not: %v", err)
        } else if local {
            lp := localPort{
                desc:     "externalIP for " + svcName.String(),
                ip:       externalIP,
                port:     svcInfo.port,
                protocol: protocol,
            }
            if proxier.portsMap[lp] != nil {
                glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
                replacementPortsMap[lp] = proxier.portsMap[lp]
            } else {
                socket, err := proxier.portMapper.OpenLocalPort(&lp)
                if err != nil {
                    glog.Errorf("can't open %s, skipping this externalIP: %v", lp.String(), err)
                    continue
                }
                replacementPortsMap[lp] = socket
            }
        } // We're holding the port, so it's OK to install iptables rules.
        args := []string{
            "-A", string(kubeServicesChain),
            "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcName.String()),
            "-m", protocol, "-p", protocol,
            "-d", fmt.Sprintf("%s/32", externalIP),
            "--dport", fmt.Sprintf("%d", svcInfo.port),
        }
        // We have to SNAT packets to external IPs.
        writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)

        // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container)
        // nor from a local process to be forwarded to the service.
        // This rule roughly translates to "all traffic from off-machine".
        // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later.
        externalTrafficOnlyArgs := append(args,
            "-m", "physdev", "!", "--physdev-is-in",
            "-m", "addrtype", "!", "--src-type", "LOCAL")
        writeLine(natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...)
        dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
        // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
        // This covers cases like GCE load-balancers which get added to the local routing table.
        writeLine(natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
    }

    // Capture load-balancer ingress.
    for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
        if ingress.IP != "" {
            // create service firewall chain
            fwChain := serviceFirewallChainName(svcName, protocol)
            if chain, ok := existingNATChains[fwChain]; ok {
                writeLine(natChains, chain)
            } else {
                writeLine(natChains, utiliptables.MakeChainLine(fwChain))
            }
            activeNATChains[fwChain] = true
            // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
            // This currently works for loadbalancers that preserves source ips.
            // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.

            args := []string{
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()),
                "-m", protocol, "-p", protocol,
                "-d", fmt.Sprintf("%s/32", ingress.IP),
                "--dport", fmt.Sprintf("%d", svcInfo.port),
            }
            // jump to service firewall chain
            writeLine(natRules, append(args, "-j", string(fwChain))...)

            args = []string{
                "-A", string(fwChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()),
            }

            // Each source match rule in the FW chain may jump to either the SVC or the XLB chain
            chosenChain := svcXlbChain
            // If we are proxying globally, we need to masquerade in case we cross nodes.
            // If we are proxying only locally, we can retain the source IP.
            if !svcInfo.onlyNodeLocalEndpoints {
                writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
                chosenChain = svcChain
            }

            if len(svcInfo.loadBalancerSourceRanges) == 0 {
                // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
                writeLine(natRules, append(args, "-j", string(chosenChain))...)
            } else {
                // firewall filter based on each source range
                allowFromNode := false
                for _, src := range svcInfo.loadBalancerSourceRanges {
                    writeLine(natRules, append(args, "-s", src, "-j", string(chosenChain))...)
                    // ignore error because it has been validated
                    _, cidr, _ := net.ParseCIDR(src)
                    if cidr.Contains(proxier.nodeIP) {
                        allowFromNode = true
                    }
                }
                // generally, ip route rule was added to intercept request to loadbalancer vip from the
                // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
                // Need to add the following rule to allow request on host.
                if allowFromNode {
                    writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...)
                }
            }

            // If the packet was able to reach the end of firewall chain, then it did not get DNATed.
            // It means the packet cannot go thru the firewall, then mark it for DROP
            writeLine(natRules, append(args, "-j", string(KubeMarkDropChain))...)
        }
    }

    // Capture nodeports.  If we had more than 2 rules it might be
    // worthwhile to make a new per-service chain for nodeport rules, but
    // with just 2 rules it ends up being a waste and a cognitive burden.
    if svcInfo.nodePort != 0 {
        // Hold the local port open so no other process can open it
        // (because the socket might open but it would never work).
        lp := localPort{
            desc:     "nodePort for " + svcName.String(),
            ip:       "",
            port:     svcInfo.nodePort,
            protocol: protocol,
        }
        if proxier.portsMap[lp] != nil {
            glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
            replacementPortsMap[lp] = proxier.portsMap[lp]
        } else {
            socket, err := proxier.portMapper.OpenLocalPort(&lp)
            if err != nil {
                glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
                continue
            }
            if lp.protocol == "udp" {
                proxier.clearUdpConntrackForPort(lp.port)
            }
            replacementPortsMap[lp] = socket
        } // We're holding the port, so it's OK to install iptables rules.

        args := []string{
            "-A", string(kubeNodePortsChain),
            "-m", "comment", "--comment", svcName.String(),
            "-m", protocol, "-p", protocol,
            "--dport", fmt.Sprintf("%d", svcInfo.nodePort),
        }
        if !svcInfo.onlyNodeLocalEndpoints {
            // Nodeports need SNAT, unless they're local.
            writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...)
            // Jump to the service chain.
            writeLine(natRules, append(args, "-j", string(svcChain))...)
        } else {
            // TODO: Make all nodePorts jump to the firewall chain.
            // Currently we only create it for loadbalancers (#33586).
            writeLine(natRules, append(args, "-j", string(svcXlbChain))...)
        }
    }

    // If the service has no endpoints then reject packets.
    if len(proxier.endpointsMap[svcName]) == 0 {
        writeLine(filterRules,
            "-A", string(kubeServicesChain),
            "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()),
            "-m", protocol, "-p", protocol,
            "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
            "--dport", fmt.Sprintf("%d", svcInfo.port),
            "-j", "REJECT",
        )
        continue
    }

    // Generate the per-endpoint chains.  We do this in multiple passes so we
    // can group rules together.
    // These two slices parallel each other - keep in sync
    endpoints := make([]*endpointsInfo, 0)
    endpointChains := make([]utiliptables.Chain, 0)
    for _, ep := range proxier.endpointsMap[svcName] {
        endpoints = append(endpoints, ep)
        endpointChain := servicePortEndpointChainName(svcName, protocol, ep.ip)
        endpointChains = append(endpointChains, endpointChain)

        // Create the endpoint chain, retaining counters if possible.
        if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok {
            writeLine(natChains, chain)
        } else {
            writeLine(natChains, utiliptables.MakeChainLine(endpointChain))
        }
        activeNATChains[endpointChain] = true
    }

    // First write session affinity rules, if applicable.
    if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
        for _, endpointChain := range endpointChains {
            writeLine(natRules,
                "-A", string(svcChain),
                "-m", "comment", "--comment", svcName.String(),
                "-m", "recent", "--name", string(endpointChain),
                "--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeMinutes*60), "--reap",
                "-j", string(endpointChain))
        }
    }

    // Now write loadbalancing & DNAT rules.
    n := len(endpointChains)
    for i, endpointChain := range endpointChains {
        // Balancing rules in the per-service chain.
        args := []string{
            "-A", string(svcChain),
            "-m", "comment", "--comment", svcName.String(),
        }
        if i < (n - 1) {
            // Each rule is a probabilistic match.
            args = append(args,
                "-m", "statistic",
                "--mode", "random",
                "--probability", fmt.Sprintf("%0.5f", 1.0/float64(n-i)))
        }
        // The final (or only if n == 1) rule is a guaranteed match.
        args = append(args, "-j", string(endpointChain))
        writeLine(natRules, args...)

        // Rules in the per-endpoint chain.
        args = []string{
            "-A", string(endpointChain),
            "-m", "comment", "--comment", svcName.String(),
        }
        // Handle traffic that loops back to the originator with SNAT.
        writeLine(natRules, append(args,
            "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].ip, ":")[0]),
            "-j", string(KubeMarkMasqChain))...)
        // Update client-affinity lists.
        if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
            args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
        }
        // DNAT to final destination.
        args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].ip)
        writeLine(natRules, args...)
    }

    // The logic below this applies only if this service is marked as OnlyLocal
    if !svcInfo.onlyNodeLocalEndpoints {
        continue
    }

    // Now write ingress loadbalancing & DNAT rules only for services that have a localOnly annotation
    // TODO - This logic may be combinable with the block above that creates the svc balancer chain
    localEndpoints := make([]*endpointsInfo, 0)
    localEndpointChains := make([]utiliptables.Chain, 0)
    for i := range endpointChains {
        if endpoints[i].localEndpoint {
            // These slices parallel each other; must be kept in sync
            localEndpoints = append(localEndpoints, endpoints[i])
            localEndpointChains = append(localEndpointChains, endpointChains[i])
        }
    }
    // First rule in the chain redirects all pod -> external vip traffic to the
    // Service's ClusterIP instead. This happens whether or not we have local
    // endpoints; only if clusterCIDR is specified
    if len(proxier.clusterCIDR) > 0 {
        args = []string{
            "-A", string(svcXlbChain),
            "-m", "comment", "--comment",
            fmt.Sprintf(`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`),
            "-s", proxier.clusterCIDR,
            "-j", string(svcChain),
        }
        writeLine(natRules, args...)
    }

    numLocalEndpoints := len(localEndpointChains)
    if numLocalEndpoints == 0 {
        // Blackhole all traffic since there are no local endpoints
        args := []string{
            "-A", string(svcXlbChain),
            "-m", "comment", "--comment",
            fmt.Sprintf(`"%s has no local endpoints"`, svcName.String()),
            "-j",
            string(KubeMarkDropChain),
        }
        writeLine(natRules, args...)
    } else {
        // Setup probability filter rules only over local endpoints
        for i, endpointChain := range localEndpointChains {
            // Balancing rules in the per-service chain.
            args := []string{
                "-A", string(svcXlbChain),
                "-m", "comment", "--comment",
                fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcName.String()),
            }
            if i < (numLocalEndpoints - 1) {
                // Each rule is a probabilistic match.
                args = append(args,
                    "-m", "statistic",
                    "--mode", "random",
                    "--probability", fmt.Sprintf("%0.5f", 1.0/float64(numLocalEndpoints-i)))
            }
            // The final (or only if n == 1) rule is a guaranteed match.
            args = append(args, "-j", string(endpointChain))
            writeLine(natRules, args...)
        }
    }
}

// Delete chains no longer in use.
for chain := range existingNATChains {
    if !activeNATChains[chain] {
        chainString := string(chain)
        if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
            // Ignore chains that aren't ours.
            continue
        }
        // We must (as per iptables) write a chain-line for it, which has
        // the nice effect of flushing the chain.  Then we can remove the
        // chain.
        writeLine(natChains, existingNATChains[chain])
        writeLine(natRules, "-X", chainString)
    }
}

// Finally, tail-call to the nodeports chain.  This needs to be after all
// other service portal rules.
writeLine(natRules,
    "-A", string(kubeServicesChain),
    "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
    "-m", "addrtype", "--dst-type", "LOCAL",
    "-j", string(kubeNodePortsChain))

// Write the end-of-table markers.
writeLine(filterRules, "COMMIT")
writeLine(natRules, "COMMIT")

// Sync rules.
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
filterLines := append(filterChains.Bytes(), filterRules.Bytes()...)
natLines := append(natChains.Bytes(), natRules.Bytes()...)
lines := append(filterLines, natLines...)

glog.V(3).Infof("Restoring iptables rules: %s", lines)
err = proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters)
if err != nil {
    glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, lines)
    // Revert new local ports.
    revertPorts(replacementPortsMap, proxier.portsMap)
    return
}

// Close old local ports and save new ones.
for k, v := range proxier.portsMap {
    if replacementPortsMap[k] == nil {
        v.Close()
    }
}
proxier.portsMap = replacementPortsMap

}

看到这么长的方法,本来想多写一点分析注释的,结果我看完已经肌无力了。 如果你自己又k8s的环境,找一台node,查看其iptables,对着下面的代码来看会好很多。 如果你没有环境,没关系,可以参考到我的上一篇博文kube-proxy工作原理查看对应的Example。

总结

kube-proxy实现了两种linux下的proxy mode:userspace和iptables,实现了一种windows下的proxy mode:userspace。 kube-proxy通过周期性的List and Watch kube-apiserver的all service and endpiont Resources,通过Channels传给对应的Broadcaster,由Broadcaster Notify给Proxier注册的Listener。List周期默认15min,可通过–config-sync-period配置。 Listener实现OnServiceUpdate和OnEndpointsUpdate接口,最终调用proxier.syncProxyRules()更新iptables。 另外,Proxy Run方法负责周期性的调用proxier.syncProxyRules()更新iptables,默认30s一次,可通过–iptables-sync-period配置。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券