前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kube-proxy源码解析

kube-proxy源码解析

作者头像
sealyun
发布2019-10-12 16:08:50
7690
发布2019-10-12 16:08:50
举报
文章被收录于专栏:sealyunsealyun

ipvs相对于iptables模式具备较高的性能与稳定性, 本文讲以此模式的源码解析为主,如果想去了解iptables模式的原理,可以去参考其实现,架构上无差别。

kube-proxy主要功能是监听service和endpoint的事件,然后下放代理策略到机器上。底层调用docker/libnetwork, 而libnetwork最终调用了netlink 与netns来实现ipvs的创建等动作

| 初始化配置

代码入口:cmd/kube-proxy/app/server.go Run() 函数

通过命令行参数去初始化proxyServer的配置

代码语言:javascript
复制
proxyServer, err := NewProxyServer(o)
代码语言:javascript
复制
type ProxyServer struct {
    // k8s client
  Client                 clientset.Interface
  EventClient            v1core.EventsGetter

    // ipvs 相关接口
  IptInterface           utiliptables.Interface
  IpvsInterface          utilipvs.Interface
  IpsetInterface         utilipset.Interface

    // 处理同步时的处理器
  Proxier                proxy.ProxyProvider

    // 代理模式,ipvs iptables userspace kernelspace(windows)四种
  ProxyMode              string
    // 配置同步周期
  ConfigSyncPeriod       time.Duration

    // service 与 endpoint 事件处理器
  ServiceEventHandler    config.ServiceHandler
  EndpointsEventHandler  config.EndpointsHandler
}

Proxier是主要入口,抽象了两个函数:

代码语言:javascript
复制
type ProxyProvider interface {
  // Sync immediately synchronizes the ProxyProvider's current state to iptables.
  Sync()
  // 定期执行
  SyncLoop()
}

ipvs 的interface 这个很重要:

代码语言:javascript
复制
type Interface interface {
  // 删除所有规则
  Flush() error
  // 增加一个virtual server
  AddVirtualServer(*VirtualServer) error

  UpdateVirtualServer(*VirtualServer) error
  DeleteVirtualServer(*VirtualServer) error
  GetVirtualServer(*VirtualServer) (*VirtualServer, error)
  GetVirtualServers() ([]*VirtualServer, error)

    // 给virtual server加个realserver, 如 VirtualServer就是一个clusterip realServer就是pod(或者自定义的endpoint)
  AddRealServer(*VirtualServer, *RealServer) error
  GetRealServers(*VirtualServer) ([]*RealServer, error)
  DeleteRealServer(*VirtualServer, *RealServer) error
}

我们在下文再详细看ipvs_linux是如何实现上面接口的

virtual server与realserver, 最重要的是ip:port,然后就是一些代理的模式如sessionAffinity等:

代码语言:javascript
复制
type VirtualServer struct {
  Address   net.IP
  Protocol  string
  Port      uint16
  Scheduler string
  Flags     ServiceFlags
  Timeout   uint32
}

type RealServer struct {
  Address net.IP
  Port    uint16
  Weight  int
}

创建apiserver client

代码语言:javascript
复制
client, eventClient, err := createClients(config.ClientConnection, master)

创建Proxier 这是仅仅关注ipvs模式的proxier

代码语言:javascript
复制
else if proxyMode == proxyModeIPVS {
    glog.V(0).Info("Using ipvs Proxier.")
    proxierIPVS, err := ipvs.NewProxier(
      iptInterface,
      ipvsInterface,
      ipsetInterface,
      utilsysctl.New(),
      execer,
      config.IPVS.SyncPeriod.Duration,
      config.IPVS.MinSyncPeriod.Duration,
      config.IPTables.MasqueradeAll,
      int(*config.IPTables.MasqueradeBit),
      config.ClusterCIDR,
      hostname,
      getNodeIP(client, hostname),
      recorder,
      healthzServer,
      config.IPVS.Scheduler,
    )
...
    proxier = proxierIPVS
    serviceEventHandler = proxierIPVS
    endpointsEventHandler = proxierIPVS

这个Proxier具备以下方法:

代码语言:javascript
复制
   +OnEndpointsAdd(endpoints *api.Endpoints)
   +OnEndpointsDelete(endpoints *api.Endpoints)
   +OnEndpointsSynced()
   +OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints)
   +OnServiceAdd(service *api.Service)
   +OnServiceDelete(service *api.Service)
   +OnServiceSynced()
   +OnServiceUpdate(oldService, service *api.Service)
   +Sync()
   +SyncLoop()

所以ipvs的这个Proxier实现了我们需要的绝大部分接口

小结一下:

代码语言:javascript
复制
     +-----------> endpointHandler
     |
     +-----------> serviceHandler
     |                ^
     |                | +-------------> sync 定期同步等
     |                | |
ProxyServer---------> Proxier --------> service 事件回调
     |                  |
     |                  +-------------> endpoint事件回调
     |                                             |  触发
     +-----> ipvs interface ipvs handler     <-----+

| 启动proxyServer

  1. 检查是不是带了clean up参数,如果带了那么清除所有规则退出
  2. OOM adjuster貌似没实现,忽略
  3. resouceContainer也没实现,忽略
  4. 启动metrics服务器,这个挺重要,比如我们想监控时可以传入这个参数, 包含promethus的 metrics. metrics-bind-address参数
  5. 启动informer, 开始监听事件,分别启动协程处理。

1 2 3 4我们都不用太关注,细看5即可:

代码语言:javascript
复制
informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)

serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
// 注册 service handler并启动
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
// 这里面仅仅是把ServiceEventHandler赋值给informer回调
go serviceConfig.Run(wait.NeverStop)

endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
// 注册endpoint
endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
go endpointsConfig.Run(wait.NeverStop)

go informerFactory.Start(wait.NeverStop)

serviceConfig.Run与endpointConfig.Run仅仅是给回调函数赋值, 所以注册的handler就给了informer, informer监听到事件时就会回调:

代码语言:javascript
复制
for i := range c.eventHandlers {
  glog.V(3).Infof("Calling handler.OnServiceSynced()")
  c.eventHandlers[i].OnServiceSynced()
}

那么问题来了,注册进去的这个handler是啥?回顾一下上文的

代码语言:javascript
复制
    serviceEventHandler = proxierIPVS
    endpointsEventHandler = proxierIPVS

所以都是这个proxierIPVS

handler的回调函数, informer会回调这几个函数,所以我们在自己开发时实现这个interface注册进去即可:

代码语言:javascript
复制
type ServiceHandler interface {
  // OnServiceAdd is called whenever creation of new service object
  // is observed.
  OnServiceAdd(service *api.Service)
  // OnServiceUpdate is called whenever modification of an existing
  // service object is observed.
  OnServiceUpdate(oldService, service *api.Service)
  // OnServiceDelete is called whenever deletion of an existing service
  // object is observed.
  OnServiceDelete(service *api.Service)
  // OnServiceSynced is called once all the initial even handlers were
  // called and the state is fully propagated to local cache.
  OnServiceSynced()
}

| 开始监听

代码语言:javascript
复制
go informerFactory.Start(wait.NeverStop)

这里执行后,我们创建删除service endpoint等动作都会被监听到,然后回调,回顾一下上面的图,最终都是由Proxier去实现,所以后面我们重点关注Proxier即可

代码语言:javascript
复制
s.Proxier.SyncLoop()

然后开始SyncLoop,下文开讲

| Proxier 实现

我们创建一个service时OnServiceAdd方法会被调用, 这里记录一下之前的状态与当前状态两个东西,然后发个信号给syncRunner让它去处理:

代码语言:javascript
复制
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
  namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
  if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
    proxier.syncRunner.Run()
  }
}

记录service 信息,可以看到没做什么事,就是把service存在map里, 如果没变直接删掉map信息不做任何处理:

代码语言:javascript
复制
change, exists := scm.items[*namespacedName]
if !exists {
  change = &serviceChange{}
    // 老的service信息
  change.previous = serviceToServiceMap(previous)
  scm.items[*namespacedName] = change
}
// 当前监听到的service信息
change.current = serviceToServiceMap(current)

如果一样,直接删除
if reflect.DeepEqual(change.previous, change.current) {
  delete(scm.items, *namespacedName)
}

proxier.syncRunner.Run() 里面就发送了一个信号

代码语言:javascript
复制
select {
  case bfr.run <- struct{}{}:
  default:
}

这里面处理了这个信号

代码语言:javascript
复制
s.Proxier.SyncLoop()

func (proxier *Proxier) SyncLoop() {
  // Update healthz timestamp at beginning in case Sync() never succeeds.
  if proxier.healthzServer != nil {
    proxier.healthzServer.UpdateTimestamp()
  }
  proxier.syncRunner.Loop(wait.NeverStop)
}

runner里收到信号执行,没收到信号会定期执行:

代码语言:javascript
复制
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
  glog.V(3).Infof("%s Loop running", bfr.name)
  bfr.timer.Reset(bfr.maxInterval)
  for {
    select {
    case <-stop:
      bfr.stop()
      glog.V(3).Infof("%s Loop stopping", bfr.name)
      return
    case <-bfr.timer.C():  // 定期执行
      bfr.tryRun()
    case <-bfr.run:
      bfr.tryRun()       // 收到事件信号执行
    }
  }
}

这个bfr runner里我们最需要主意的是一个回调函数,tryRun里检查这个回调是否满足被调度的条件:

代码语言:javascript
复制
type BoundedFrequencyRunner struct {
  name        string        // the name of this instance
  minInterval time.Duration // the min time between runs, modulo bursts
  maxInterval time.Duration // the max time between runs

  run chan struct{} // try an async run

  mu      sync.Mutex  // guards runs of fn and all mutations
  fn      func()      // function to run, 这个回调
  lastRun time.Time   // time of last run
  timer   timer       // timer for deferred runs
  limiter rateLimiter // rate limiter for on-demand runs
}

// 传入的proxier.syncProxyRules这个函数
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

这是个600行左右的搓逼函数,也是处理主要逻辑的地方。

| syncProxyRules

  1. 设置一些iptables规则,如mark与comment
  2. 确定机器上有网卡,ipvs需要绑定地址到上面
  3. 确定有ipset,ipset是iptables的扩展,可以给一批地址设置iptables规则 …(又臭又长,重复代码多,看不下去了,细节问题自己去看吧)
  4. 我们最关注的,如何去处理VirtualServer的
代码语言:javascript
复制
serv := &utilipvs.VirtualServer{
  Address:   net.ParseIP(ingress.IP),
  Port:      uint16(svcInfo.port),
  Protocol:  string(svcInfo.protocol),
  Scheduler: proxier.ipvsScheduler,
}
if err := proxier.syncService(svcNameString, serv, false); err == nil {
  if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
  }
}

看下实现, 如果没有就创建,如果已存在就更新, 给网卡绑定service的cluster ip:

代码语言:javascript
复制
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
  appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
  if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
    if appliedVirtualServer == nil {
      if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
        return err
      }
    } else {
      if err := proxier.ipvs.UpdateVirtualServer(appliedVirtualServer); err != nil {
        return err
      }
    }
  }

  // bind service address to dummy interface even if service not changed,
  // in case that service IP was removed by other processes
  if bindAddr {
    _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
    if err != nil {
      return err
    }
  }
  return nil
}

| 创建service实现

现在可以去看ipvs的AddVirtualServer的实现了,主要是利用socket与内核进程通信做到的。 pkg/util/ipvs/ipvs_linux.go 里 runner结构体实现了这些方法, 这里用到了 docker/libnetwork/ipvs库:

代码语言:javascript
复制
// runner implements Interface.
type runner struct {
  exec       utilexec.Interface
  ipvsHandle *ipvs.Handle
}

// New returns a new Interface which will call ipvs APIs.
func New(exec utilexec.Interface) Interface {
  ihandle, err := ipvs.New("") // github.com/docker/libnetwork/ipvs
  if err != nil {
    glog.Errorf("IPVS interface can't be initialized, error: %v", err)
    return nil
  }
  return &runner{
    exec:       exec,
    ipvsHandle: ihandle,
  }
}

New的时候创建了一个特殊的socket, 这里与我们普通的socket编程无差别,关键是syscall.AF_NETLINK这个参数,代表与内核进程通信:

代码语言:javascript
复制
sock, err := nl.GetNetlinkSocketAt(n, netns.None(), syscall.NETLINK_GENERIC)

func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
  fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, protocol)
  if err != nil {
    return nil, err
  }
  s := &NetlinkSocket{
    fd: int32(fd),
  }
  s.lsa.Family = syscall.AF_NETLINK
  if err := syscall.Bind(fd, &s.lsa); err != nil {
    syscall.Close(fd)
    return nil, err
  }

  return s, nil
}

创建一个service, 转换成docker service格式,直接调用:

代码语言:javascript
复制
// AddVirtualServer is part of Interface.
func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
  eSvc, err := toBackendService(vs)
  if err != nil {
    return err
  }
  return runner.ipvsHandle.NewService(eSvc)
}

然后就是把service信息打包,往socket里面写即可:

代码语言:javascript
复制
func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
  req := newIPVSRequest(cmd)
  req.Seq = atomic.AddUint32(&i.seq, 1)

  if s == nil {
    req.Flags |= syscall.NLM_F_DUMP                    //Flag to dump all messages
    req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
  } else {
    req.AddData(fillService(s))
  } // 把service塞到请求中

  if d == nil {
    if cmd == ipvsCmdGetDest {
      req.Flags |= syscall.NLM_F_DUMP
    }

  } else {
    req.AddData(fillDestinaton(d))
  }

    // 给内核进程发送service信息
  res, err := execute(i.sock, req, 0)
  if err != nil {
    return [][]byte{}, err
  }

  return res, nil
}

构造请求

代码语言:javascript
复制
func newIPVSRequest(cmd uint8) *nl.NetlinkRequest {
  return newGenlRequest(ipvsFamily, cmd)
}

在构造请求时传入的是ipvs协议簇

然后构造一个与内核通信的消息头

代码语言:javascript
复制
func NewNetlinkRequest(proto, flags int) *NetlinkRequest {
  return &NetlinkRequest{
    NlMsghdr: syscall.NlMsghdr{
      Len:   uint32(syscall.SizeofNlMsghdr),
      Type:  uint16(proto),
      Flags: syscall.NLM_F_REQUEST | uint16(flags),
      Seq:   atomic.AddUint32(&nextSeqNr, 1),
    },
  }
}

给消息加Data,这个Data是个数组,需要实现两个方法:

代码语言:javascript
复制
type NetlinkRequestData interface {
  Len() int  // 长度
  Serialize() []byte // 序列化, 内核通信也需要一定的数据格式,service信息也需要实现
}

比如 header是这样序列化的, 一看愣住了,思考好久才看懂:拆下看:(*[unsafe.Sizeof(hdr)]byte) 一个[]byte类型,长度就是结构体大小 (unsafe.Pointer(hdr))把结构体转成byte指针类型 加个*取它的值 用[:]转成byte返回

代码语言:javascript
复制
func (hdr *genlMsgHdr) Serialize() []byte {
  return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:]
}

发送service信息给内核

一个很普通的socket发送接收数据

代码语言:javascript
复制
func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
  var (
    err error
  )

  if err := s.Send(req); err != nil {
    return nil, err
  }

  pid, err := s.GetPid()
  if err != nil {
    return nil, err
  }

  var res [][]byte

done:
  for {
    msgs, err := s.Receive()
    if err != nil {
      return nil, err
    }
    for _, m := range msgs {
      if m.Header.Seq != req.Seq {
        continue
      }
      if m.Header.Pid != pid {
        return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
      }
      if m.Header.Type == syscall.NLMSG_DONE {
        break done
      }
      if m.Header.Type == syscall.NLMSG_ERROR {
        error := int32(native.Uint32(m.Data[0:4]))
        if error == 0 {
          break done
        }
        return nil, syscall.Errno(-error)
      }
      if resType != 0 && m.Header.Type != resType {
        continue
      }
      res = append(res, m.Data)
      if m.Header.Flags&syscall.NLM_F_MULTI == 0 {
        break done
      }
    }
  }
  return res, nil
}

| 总结

Service总体来讲代码比较简单,但是觉得有些地方实现的有点绕,不够简单直接,还有一个函数八百多行的有点不清晰。总体来说就是监听apiserver事件,然后比对处理,定期也会去执行同步策略.

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-10-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 sealyun 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • | 初始化配置
  • | 启动proxyServer
  • | 开始监听
  • | Proxier 实现
  • | syncProxyRules
  • | 创建service实现
相关产品与服务
容器镜像服务
容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档