前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Serverless 云原生网关 Gloo (下篇)

Serverless 云原生网关 Gloo (下篇)

作者头像
zouyee
发布2022-05-25 10:06:17
1.8K0
发布2022-05-25 10:06:17
举报
文章被收录于专栏:Kubernetes GO

Gloo基于Envoy实现,致力于成为下一代API网关标杆产品,其在函数级路由等方面表现优异,对旧式应用、微服务和serverless提供支持;与云原生标杆的开源项目(如Envoy、Kubernetes等)紧密集成。Gloo凭借生态支持的多样性、多云架构的灵活性,在云原生网关starups中脱颖而出。

文|董琪

编辑|zouyee

技术深度|适中

  • Kubernetes ingress controller: 当部署在Kubernetes上时,Gloo可以充当功能丰富的入口控制器,并且当部署到AWS EKS等公共云时,可以简化路由功能。
  • 异构应用: Gloo创建的应用程序路由到实现为微服务,无服务器功能和旧式应用程序的后端。此功能可以帮助用户逐渐从旧代码迁移到微服务、无服务器架构。用户在保持其原有代码逻辑前提下,使用云原生技术添加新功能;组织中的不同团队可以选择不同架构及其他场景。
  • 服务网格Ingress: 服务网格技术解决了跨云网络的服务到服务通信问题。可以使用服务网格解决诸如服务标识,七层网络遥测信息收集,服务弹性,服务之间的流量路由以及策略执行(例如配额,速率限制等)之类的问题。为了使服务网格正常运行,它需要一种使流量进入网格的方法。将流量从边缘传输到集群内部的问题与服务到服务的问题有所不同。前端网关应当提供缓存,安全性和流量管理,Oauth和最终用户身份验证/授权,用户速率限制,Web应用程序防火墙等。Gloo解决了以上问题,兼容所有服务网格技术,包括Istio,Linkerd,Consul Connect和AWS App Mesh。 :关于Gloo的相关介绍,参看前文五分钟初识Gloo,Gloo版本1.3.34

Gloo分析

Gloo 的运作由多个独立组件构成,为了方便日后运维和性能优化,有必要了解其具体实现机制。首先,本文会讲解各组件共用的代码部分,即支持动态更新的启动机制。之后再介绍各组件差异性功能的代码实现。

源码结构

Gloo 支持 Gateway,Knative,Ingress 三种模式运行,不同模式所需使用的组件也不同,其中 Knative 模式使用Discovery,EnvoyInit,Gloo,Ingress 四个组件,目录结构如下:

代码语言:javascript
复制
➜  projects git:(master) tree -L 1
.
├── accesslogger
├── clusteringress
├── discovery
├── envoyinit
├── examples
├── gateway
├── gloo
├── hypergloo
├── ingress
├── knative
└── sds

Serverless 中 Gloo 就使用的 Knative 模式,下面将针对 Knative 模式的 Gloo 进行源码讲解。

启动分析

Discovery,Ingress,Gloo 三个组件的启动代码类似,首先会定义处理实际业务逻辑的函数 SetupFunc,这部分各组件不同,之后都会监听 Setting CR(Custome Resource,自定义资源),该 CR 定义了组件的配置信息。当 Setting CR 被修改,SetupFunc 会根据新的 Setting CR 自动重启,达到动态更新配置,而不用重启组件 Pod 的目的。

组件的启动流程如上图所示,下面以 Ingress 组件的启动代码为例做具体分析。

1)main

main 函数作为程序的入口函数,调用 setup.Main

代码语言:javascript
复制
# projects/ingress/cmd/main.go
func main() {
  if err := setup.Main(nil); err != nil {
        log.Fatalf("err in main: %v", err.Error())
    }
}

setup.Main中主要看两点:

1. setuputils.Main 函数,该函数是实现动态配置更新的入口函数

2. Setup 函数变量,该函数是用户自定义业务逻辑的入口函数

代码语言:javascript
复制
# pkg/utils/setuputils/main_setup.go
func Main(customCtx context.Context) error {
    return setuputils.Main(setuputils.SetupOpts{
        LoggerName:  "ingress",
        Version:     version.Version,
        SetupFunc:   Setup,
        ExitOnError: true,
        CustomCtx:   customCtx,
    })

}

2) NewSetupEventLoop

实际入口 Main 函数中,我们需要关注 Setting 资源的监听和自定义业务逻辑的启动。

代码语言:javascript
复制
# pkg/utils/setuputils/main_setup.go
func Main(opts SetupOpts) error {
    //....
    emitter := v1.NewSetupEmitter(settingsClient)
    settingsRef := core.ResourceRef{Namespace: setupNamespace, Name: setupName}
    eventLoop := v1.NewSetupEventLoop(emitter, NewSetupSyncer(settingsRef, opts.SetupFunc))
    errs, err := eventLoop.Run([]string{setupNamespace}, clients.WatchOpts{
        Ctx:         ctx,
        RefreshRate: time.Second,
         })
    //...
    return nil
}

NewSetupEmitter 函数以 Setting 客户端为参数,返回 SetupEmitter 接口,该接口嵌套 SetupSnapshotEmitter 接口并实现 Snapshots 方法。Snapshots 方法会监听 Setting 资源,并返回 channel 通道,并确保通道中的 Setting 资源最新。

代码语言:javascript
复制
# projects/gloo/pkg/api/v1/setup_snapshot_emitter.sk.go
type SetupSnapshotEmitter interface {
    Snapshots(watchNamespaces []string, opts clients.WatchOpts) (<-chan *SetupSnapshot, <-chan error, error)
}
type SetupEmitter interface {
    SetupSnapshotEmitter
    Register() error
    Settings() SettingsClient
}
func NewSetupEmitter(settingsClient SettingsClient) SetupEmitter {
    return NewSetupEmitterWithEmit(settingsClient, make(chan struct{}))
}
func NewSetupEmitterWithEmit(settingsClient SettingsClient, emit <-chan struct{}) SetupEmitter {
    return &setupEmitter{
        settings:  settingsClient,
        forceEmit: emit,
    }
}

NewSetupEventLoop 负责业务逻辑函数的封装,其第二个参数为 NewSetupSyncer 函数的返回值。NewSetupSyncer 函数将 opts.SetupFunc 函数变量封装在 SetupSyncer结构体中。而 SetupSyncer结构体实现了 Sync 方法(之后 Run 函数中被调用),该方法中s.setupFunc 函数变量就是之前封装的业务逻辑处理函数 opts.SetupFunc。

代码语言:javascript
复制
# pkg/utils/setuputils/setup_syncer.go
func NewSetupSyncer(settingsRef core.ResourceRef, setupFunc SetupFunc) *SetupSyncer {
  return &SetupSyncer{
    settingsRef:  settingsRef,
    setupFunc:   setupFunc,
    inMemoryCache: memory.NewInMemoryResourceCache(),
  }
}
func (s *SetupSyncer) Sync(ctx context.Context, snap *v1.SetupSnapshot) error {
  settings, err := snap.Settings.Find(s.settingsRef.Strings())
  //...
  return s.setupFunc(ctx, kube.NewKubeCache(ctx), s.inMemoryCache, settings)
}

3) Run 函数

Run 函数用于启动之前的监听逻辑和组件自定义的业务逻辑。Snapshots 会监听 Setting 资源并将最新的资源通过 channel watch 返回;goroutine 中获取到最新的 snapshot 后调用 Sync 函数来启动组件自定义的业务逻辑。

代码语言:javascript
复制
# projects/gloo/pkg/api/v1/setup_event_loop.sk.go
func (el *setupEventLoop) Run(namespaces []string, opts clients.WatchOpts) (<-chan error, error) {
  //...
  watch, emitterErrs, err := el.emitter.Snapshots(namespaces, opts)
  go func() {
    //...
    for {
      select {
      case snapshot, ok := <-watch:
        //...
        ctx, canc := context.WithCancel(ctx)
        cancel = canc
        err := el.syncer.Sync(ctx, snapshot)
        //...
      case <-opts.Ctx.Done():
        return
      }
    }
  }()
  return errs, nil
}

4) Snapshots 函数

el.emitter.Snapshots 函数首先通过 List 函数获取 Setting 资源作为初始化列表,并通过Watch函数监听资源获取最新列表,之后通过 Hash 函数对比判断资源是否发生变化,如果发生变化则通过 channel 通道返回最新的 Setting 资源,否则继续监听。

代码语言:javascript
复制
# projects/gloo/pkg/api/v1/setup_snapshot_emitter.sk.go
func (c *setupEmitter) Snapshots(watchNamespaces []string, opts clients.WatchOpts) (<-chan *SetupSnapshot, <-chan error, error) {
  //......
  for _, namespace := range watchNamespaces {
    // c.settings.List 列出所有 namespace 下的 Setting 资源
    {
      settings, err := c.settings.List(namespace, clients.ListOpts{Ctx: opts.Ctx, Selector: opts.Selector})
      if err != nil {
        return nil, nil, errors.Wrapf(err, "initial Settings list")
      }
      initialSettingsList = append(initialSettingsList, settings...)
    }
    // c.settings.Watch 监听 Setting 资源的变化,返回最新资源
    settingsNamespacesChan, settingsErrs, err := c.settings.Watch(namespace, opts)
    if err != nil {
      return nil, nil, errors.Wrapf(err, "starting Settings watch")
    }
    // 对新的资源进行封装
    go func(namespace string) {
      for {
        select {
        case <-ctx.Done():
          return
        case settingsList := <-settingsNamespacesChan:
          select {
          case <-ctx.Done():
            return
          case settingsChan <- settingsListWithNamespace{list: settingsList, namespace: namespace}:
          }
        }
      }
    }(namespace)
  }
  // 对最开始 List 返回的 Setting 资源排序,用于之后的 Hash 比较
  currentSnapshot.Settings = initialSettingsList.Sort()
  snapshots := make(chan *SetupSnapshot)
  go func() {
    // sent initial snapshot to kick off the watch
    initialSnapshot := currentSnapshot.Clone()
    snapshots <- &initialSnapshot
    timer := time.NewTicker(time.Second * 1)
    previousHash, err := currentSnapshot.Hash(nil)
    // 定义 sync 函数变量,对比 currentHash 和 previousHash Hash 值
    // 相同则直接返回,不同的话会将当前最新资源发送到 snapshots 通道中
    // snapshots 通道作为 Snapshots 方法的返回值,传递给调用者
    sync := func() {
      currentHash, err := currentSnapshot.Hash(nil)
      // 第一次运行时,currentHash 和 previousHash 相同,直接返回
      if previousHash == currentHash {
        return
      }
      sentSnapshot := currentSnapshot.Clone()
      select {
      case snapshots <- &sentSnapshot:
        previousHash = currentHash
      
    }
    settingsByNamespace := make(map[string]SettingsList)

    for {
      select {

      // 定时调用 sync 函数确保将已变化的资源返回给 Snapshots 方法的调用者。
      ...

      case settingsNamespacedList := <-settingsChan:
        namespace := settingsNamespacedList.namespace

        // merge lists by namespace
        settingsByNamespace[namespace] = settingsNamespacedList.list
        var settingsList SettingsList
        for _, settings := range settingsByNamespace {
          settingsList = append(settingsList, settings...)
        }
        // 从 settingsChan 通道中获取最新的资源,并赋值给 currentSnapshot 结构体
        // 之后将在 sync 环境变量中计算 currentHash 并于 previsouHahs 比较
        currentSnapshot.Settings = settingsList.Sort()
      }
    }
  }()
  return snapshots, errs, nil
}

5) Sync 函数

el.syncer.Sync 函数中会调用实际的业务逻辑函数,s.setupFunc 函数。该函数变量是在 NewSetupEventLoop 是进行赋值的。

代码语言:javascript
复制
# pkg/utils/setuputils/setup_syncer.go
func (s *SetupSyncer) Sync(ctx context.Context, snap *v1.SetupSnapshot) error {
  settings, err := snap.Settings.Find(s.settingsRef.Strings())
  //...
  return s.setupFunc(ctx, kube.NewKubeCache(ctx), s.inMemoryCache, settings)
}

同时 el.syncer.Sync 函数运行在 Run 函数中的goroutine 中,当 Setting资源发生变化时,就会触发 el.syncer.Sync 函数的重新调用,从而达到动态更新配置,而不用重启组件 Pod 的目的。

代码语言:javascript
复制
# projects/gloo/pkg/api/v1/setup_event_loop.sk.go

func (el *setupEventLoop) Run(namespaces []string, opts clients.WatchOpts) (<-chan error, error) {
  //.....
  watch, emitterErrs, err := el.emitter.Snapshots(namespaces, opts)
  go func() {
    //.....
    for {
      select {
      case snapshot, ok := <-watch:
        err := el.syncer.Sync(ctx, snapshot)
        //.....
    }
  }()
  return errs, nil
}

至此,组件的启动代码分析完毕,下面将介绍 SetupFunc 函数,即各组件业务逻辑的部分。

ingress组件

Ingress 组件的业务逻辑是监听 Knative 环境中的 KIngress 资源,并根据 KIngress 资源创建维护 Proxy 资源。此外,成功创建或更新 Proxy 资源后 Ingress 组件会更新 KIngress 资源的 status 状态,以此告知 Knative 的 Controller 组件该 KIngress 路由信息已经成功配置,Knative 获取该状态后会逐层上报至 Route,KSVC 等资源。

代码语言:javascript
复制
# projects/ingress/pkg/setup/setup_syncer.go

func Setup(ctx context.Context, kubeCache kube.SharedCache, inMemoryCache memory.InMemoryResourceCache, settings *gloov1.Settings) error {
  //.....
  return RunIngress(opts)
}

RunIngress 函数中 knativeTranslatorSync 结构体变量非常重要,其自带函数变量 translateProxy用于生成 Proxy 资源,后面将详细介绍。RunIngress 函数最终会调用 Run 函数。

代码语言:javascript
复制
# projects/ingress/pkg/setup/setup_syncer.go

func RunIngress(opts Opts) error {
  ...
  if opts.EnableKnative {
    // 兼容旧版 Knative
    if pre080knativeVersion(opts.KnativeVersion) {
      ...
    } else {
      ...
      knativeTranslatorEmitter := knativev1.NewTranslatorEmitter(ingressClient)
      knativeTranslatorSync := knativetranslator.NewSyncer(
      ...
      )
      knativeTranslatorEventLoop := knativev1.NewTranslatorEventLoop(knativeTranslatorEmitter, knativeTranslatorSync)
      knativeTranslatorEventLoopErrs, err := knativeTranslatorEventLoop.Run(opts.WatchNamespaces, opts.WatchOpts)
      ...
    }
  }
  ...
}

Run 函数内部逻辑与启动代码类似,只不过这里的 Run 函数实现的是 EventLoop 接口。Snapshots 函数这次监听处理的不再是 Setting 资源,而是Ingress资源(ingresses.networking.internal.knative.dev,区别于 K8s中的 Ingress),其余逻辑相同。

代码语言:javascript
复制
# projects/knative/pkg/api/v1/translator_event_loop.sk.go

func (el *translatorEventLoop) Run(namespaces []string, opts clients.WatchOpts) (<-chan error, error) {
  ...
  watch, emitterErrs, err := el.emitter.Snapshots(namespaces, opts)
  ...
  select {
    case snapshot, ok := <-watch:
        err := el.syncer.Sync(ctx, snapshot)     
        ...
}

这里的 el.syncer.Sync 实现的是 TranslatorSyncer接口,且内部逻辑与启动代码中的也不同。具体如下:

  1. Sync 函数首先通过 translateProxy 函数构造出新的Proxy 资源。
  2. Reconcile 函数将构造出的 Proxy 资源利用创建、更新等方法同步到 K8s集群环境中。
  3. propagateProxyStatus 函数更新 Ingress 资源的status 状态。

下面将对每个步骤做具体分析。

代码语言:javascript
复制
# projects/knative/pkg/translator/translator_syncer.go

func (s *translatorSyncer) Sync(ctx context.Context, snap *v1.TranslatorSnapshot) error {
  ...
  externalProxy, err := s.translateProxy(ctx, externalProxyName, s.writeNamespace, externalIngresses)
  ...
  if err := s.proxyReconciler.Reconcile(s.writeNamespace, desiredResources, utils.TransitionFunction, clients.ListOpts{
    Ctx:    ctx,
    Selector: labels,
  }); err != nil {
    return err
  }
  g := &errgroup.Group{}
  g.Go(func() error {
    if err := s.propagateProxyStatus(ctx, externalProxy, externalIngresses); err != nil {
      return eris.Wrapf(err, "failed to propagate external proxy status "+
        "to ingress objects")
    }
    return nil
  })
  //......
}

translateProxy 函数即是之前 RunIngress 函数中 knativeTranslatorSync 结构体变量中的函数变量。可以看出其通过 routingConfig 函数处理 Ingress 资源并最终构建 Proxy 资源。逻辑比较简单,不做详细介绍。

代码语言:javascript
复制
# projects/knative/pkg/translator/translate.go

func translateProxy(ctx context.Context, proxyName, proxyNamespace string, ingresses v1alpha1.IngressList) (*gloov1.Proxy, error) {
  ingressSpecsByRef := make(map[*core.Metadata]knativev1alpha1.IngressSpec)
  for _, ing := range ingresses {
    meta := ing.GetMetadata()
    ingressSpecsByRef[&meta] = ing.Spec
  }
  return TranslateProxyFromSpecs(ctx, proxyName, proxyNamespace, ingressSpecsByRef)
}
...
// made public to be shared with the (soon to be deprecated) clusteringress controller
func TranslateProxyFromSpecs(ctx context.Context, proxyName, proxyNamespace string, ingresses map[core.Metadata]knativev1alpha1.IngressSpec) (gloov1.Proxy, error) {
  // 处理 Ingress 资源
  virtualHostsHttp, virtualHostsHttps, sslConfigs, err := routingConfig(ctx, ingresses)
  ...
  // 构造 Proxy 资源
  return &gloov1.Proxy{
    Metadata: core.Metadata{
      Name:    proxyName, // must match envoy role
      Namespace: proxyNamespace,
    },
    Listeners: listeners,
  }, nil
}

Reconcile 将构建出的 Proxy 资源同步到环境中去。可以看出 Reconcile函数的具体实现在 r.base.Reconcile 中。

代码语言:javascript
复制
# projects/gloo/pkg/api/v1/proxy_reconciler.sk.go

func (r *proxyReconciler) Reconcile(namespace string, desiredResources ProxyList, transition TransitionProxyFunc, opts clients.ListOpts) error {
  //......
  return r.base.Reconcile(namespace, proxysToResources(desiredResources), transitionResources, opts)
}

r.base.Reconcile 是第三方 github.com/solo-io/solo-kit/pkg/api/v1/reconcile 库中的函数。

代码语言:javascript
复制
func (r *reconciler) Reconcile(namespace string, desiredResources resources.ResourceList, transition TransitionResourcesFunc, opts clients.ListOpts) error {
  opts = opts.WithDefaults()
  opts.Ctx = contextutils.WithLogger(opts.Ctx, "reconciler")
  originalResources, err := r.rc.List(namespace, opts)
  if err != nil {
    return err
  }
for _, desired := range desiredResources {
  // 创建或更新资源
    if err := r.syncResource(opts.Ctx, desired, originalResources, transition); err != nil {
      return errors.Wrapf(err, "reconciling resource %v", desired.GetMetadata().Name)
    }
  }
  // 删除资源
  for _, original := range originalResources {
    unused := findResource(original.GetMetadata().Namespace, original.GetMetadata().Name, desiredResources) == nil
    if unused {
      if err := deleteStaleResource(opts.Ctx, r.rc, original); err != nil {
        return errors.Wrapf(err, "deleting stale resource %v", original.GetMetadata().Name)
      }
    }
  }
  return nil
}

其中 r.rc.List 通过 opts 中特定的 labels 去获取当前环境中的 Proxy 资源,labels 即之前 Sync 方法中定义的map结构体,之后 syncResource 方法最终会调用 ResourceClient 的 Write 方法负责新建或更新资源,deleteStaleResource 会调用 ResourceClient 的 Delete 方法负责删除资源。调用链如下图所示。

propagateProxyStatus 函数用于更新 Ingress status 状态。当 Proxy 资源被 Gloo 组件成功获取后,Gloo 组件会修改 Proxy 资源的 status 状态,此时会调用 markIngressesReady 函数。markIngressesReady 最终会调用 UpdateStatus 函数去更新 Ingress 资源的 status 状态。

代码语言:javascript
复制
# projects/knative/pkg/translator/translator_syncer.go

//propagate to all ingresses the status of the proxy

func (s *translatorSyncer) propagateProxyStatus(ctx context.Context, proxy *gloov1.Proxy, ingresses v1alpha1.IngressList) error {
  ...
  for {
    select {
      ...
    case <-ticker:
      ....
      switch updatedProxy.Status.State {
      case core.Status_Pending:
        continue
      case core.Status_Rejected:
        contextutils.LoggerFrom(ctx).Errorf("proxy was rejected by gloo: %v",
          updatedProxy.Status.Reason)
        return nil
      case core.Status_Accepted:
        return s.markIngressesReady(ctx, ingresses)
      }
    }
  }
}
func (s *translatorSyncer) markIngressesReady(ctx context.Context, ingresses v1alpha1.IngressList) error {
  ...
  for _, ingress := range updatedIngresses {
    // 更新 Ingress 资源的 status 状态
    if _, err := s.ingressClient.Ingresses(ingress.Namespace).UpdateStatus(ingress); err != nil {
      contextutils.LoggerFrom(ctx).Errorf("failed to update Ingress %v status with error %v", ingress.Name, err)
    }
  }
  return nil
}

Discovery组件

Discovery 组件将监听 Kubernetes 环境中的 Service 资源,当有新 Service 被创建时,Discovery 组件会创建并维护 Upstreams 资源。RunUDS 函数是 Discovery 组件业务逻辑处理的入口函数,主要看其中的 StartUds 函数,Run 函数与 Ingress 中的逻辑类似,这里就不再赘述。

代码语言:javascript
复制
# projects/discovery/pkg/uds/syncer/setup_syncer.go

func RunUDS(opts bootstrap.Opts) error {
  ...
  uds := discovery.NewUpstreamDiscovery(watchNamespaces, opts.WriteNamespace, upstreamClient, discoveryPlugins)
  udsErrs, err := uds.StartUds(watchOpts, discovery.Opts{})
  ...
  eventLoop := v1.NewDiscoveryEventLoop(emitter, sync)
  eventLoopErrs, err := eventLoop.Run(opts.WatchNamespaces, watchOpts)
  ...
}

StartUds 函数会监听 Kubernetes Service 资源并将其转化为 Upstreams 资源,之后修改其 labels 标签,最后通过 Resync 函数将 Upstreams 资源同步到环境中。

代码语言:javascript
复制
# projects/gloo/pkg/discovery/discovery.go

func (d *UpstreamDiscovery) StartUds(opts clients.WatchOpts, discOpts Opts) (chan error, error) {
  aggregatedErrs := make(chan error)
  d.extraSelectorLabels = opts.Selector
  for _, uds := range d.discoveryPlugins {
    // 获取集群中的 Kubernetes Serivce 资源并将其转换为 Upstreams 资源
    upstreams, errs, err := uds.DiscoverUpstreams(d.watchNamespaces, d.writeNamespace, opts, discOpts)
    ...
    go func(uds DiscoveryPlugin) {
      udsName := strings.Replace(reflect.TypeOf(uds).String(), "*", "", -1)
      udsName = strings.Replace(udsName, ".", "", -1)
      for {
        select {
        case upstreamList := <-upstreams:
          d.lock.Lock()
          // 修改 labels 标签
          upstreamList = setLabels(udsName, upstreamList)
          d.latestDesiredUpstreams[uds] = upstreamList
          d.lock.Unlock()
          // 将构建的最新 Upstreams 资源同步到环境中
          if err := d.Resync(opts.Ctx); err != nil {
            aggregatedErrs <- errors.Wrapf(err, "error in uds plugin %v", reflect.TypeOf(uds).Name())
          }
          //......
      }
    }(uds)
  }
  return aggregatedErrs, nil
}

DiscoveryUpstreams 函数中会启动一个 goroutine,当资源变化时会触发 discoverUpstreams 函数,discoverUpstreams 中首先通过 List 函数获取各个 Namespace 下的 Kubernetes Service 资源,之后通过 ConvertServices 函数将 Service 资源转化为 Upstreams 资源。

代码语言:javascript
复制
# projects/gloo/pkg/plugins/kubernetes/uds.go

func (p *plugin) DiscoverUpstreams(watchNamespaces []string, writeNamespace string, opts clients.WatchOpts, discOpts discovery.Opts) (chan v1.UpstreamList, chan error, error) {
  //...
  discoverUpstreams := func() {
    var serviceList []*kubev1.Service
    for _, ns := range watchNamespaces {
      // 获取 Kubernetes Service 资源
      services, err := p.kubeCoreCache.NamespacedServiceLister(ns).List(labels.SelectorFromSet(opts.Selector))
      if err != nil {
        errs <- err
        return
      }
      serviceList = append(serviceList, services...)
    }
    // 转化为 Upstreams 资源
    upstreams := p.ConvertServices(ctx, watchNamespaces, serviceList, discOpts, writeNamespace)
    logger.Debugw("discovered services", "num", len(upstreams))
    upstreamsChan <- upstreams
  }
  go func() {
    //......
    discoverUpstreams()
    for {
      select {
      case _, ok := <-watch:
        if !ok {
          return
        }
        discoverUpstreams()
      case <-ctx.Done():
        return
      }
    }
  }()
  return upstreamsChan, errs, nil
}

Resync 函数会调用 Reconcile 将新的构建的 Upstreams 资源通过创建,更新,删除等方法同步到环境中。Reconcile 的具体实现可以参照之前 Ingress 章节中讲解的 Reconcile 函数,这里不再赘述。

代码语言:javascript
复制
# projects/gloo/pkg/discovery/discovery.go

func (d *UpstreamDiscovery) Resync(ctx context.Context) error {
  ...
  for uds, desiredUpstreams := range d.latestDesiredUpstreams {
    ...
    if err := d.upstreamReconciler.Reconcile(d.writeNamespace, desiredUpstreams, uds.UpdateUpstream, clients.ListOpts{
      Ctx:    ctx,
      Selector: selector,
    })
    ...
  }
  return nil
}

EnvoyInit组件

EnvoyInit 组件的功能相对简单,其只负责启动 envoy 代理。

main 函数为入口函数,逻辑比较简单,首先获取 envoy 代理的初始化配置,之后启动 envoy 代理。

代码语言:javascript
复制
# projects/envoyinit/cmd/main.go

func main() {
  // 获取配置信息
  outCfg, err := getConfig()
  ...
  env := os.Environ()
  args := []string{envoy(), "--config-yaml", outCfg}
  if len(os.Args) > 1 {
    args = append(args, os.Args[1:]...)
  }
  // 启动 envoy 代理的二进制文件
  if err := syscall.Exec(args[0], args, env); err != nil {
    panic(err)
  }
}

getConfig 函数会返回 envoy 的最终配置信息,首先inputCfg 会返回原始配置文件的路径,之后根据路径读取文件内容,最终通过 Transform渲染出最终配置。

代码语言:javascript
复制
# projects/envoyinit/cmd/main.go

func getConfig() (string, error) {
  inputfile := inputCfg()
  inreader, err := os.Open(inputfile)
  if err != nil {
    return "", err
  }
  defer inreader.Close()
  var buffer bytes.Buffer
  transformer := downward.NewTransformer()
  err = transformer.Transform(inreader, &buffer)
  if err != nil {
    return "", err
  }
  return buffer.String(), nil
}

其中,原始配置文件是通过 configmap knative-external-proxy-config 挂载到 EnvoyInit 的 Pod 中,其中包括 Go Template 模板,但在整个流程中并没有发现模板的渲染逻辑,这是因为模板渲染函数在 NewTransformer 中就确定了。NewTransformerTransformConfigTemplates 函数变量即用于渲染配置文件,而渲染最终会通过 interpolate 函数变量来实现。

代码语言:javascript
复制
# pkg/mod/github.com/solo-io/envoy-operator@v0.1.1/pkg/downward/transform.go

func NewTransformer() *Transformer {
  return &Transformer{
    transformations: []func(*envoy_config_v2.Bootstrap) error{TransformConfigTemplates},
  }
}

func TransformConfigTemplates(bootstrapConfig *envoy_config_v2.Bootstrap) error {
  api := RetrieveDownwardAPI()
  return TransformConfigTemplatesWithApi(bootstrapConfig, api)
}

func TransformConfigTemplatesWithApi(bootstrapConfig *envoy_config_v2.Bootstrap, api DownwardAPI) error {
  interpolate := func(s *string) error { return interpolator.InterpolateString(s, api) }
  ...
}

interpolator.InterpolateString 函数调用 Interpolate,而 Interpolate 函数中可以明显看到 Go Template 的渲染逻辑。

代码语言:javascript
复制
# pkg/mod/github.com/solo-io/envoy-operator@v0.1.1/pkg/downward/template.go

func (*interpolator) Interpolate(tmpl string, out io.Writer, data DownwardAPI) error {
  t, err := template.New("template").Option("missingkey=zero").Parse(tmpl)
  ...
  err = t.Execute(out, data)
  ...
}

func (i *interpolator) InterpolateString(tmpl *string, data DownwardAPI) error {
  var b bytes.Buffer
  err := i.Interpolate(*tmpl, &b, data)
  //......
}

Gloo组件

Gloo 遵循基于事件的体系结构,通过监视各种配置源进而获取 Proxy,Upstreams,Secrets,AuthConfigs 等相关资源并通过转换成对应的 Envoy 配置,同时启动 gRPC Server 端,等待 Envoy 与之通信。

NewSetupFunc函数为 Gloo 组件业务逻辑处理的入口函数,这里要关注两个函数 RunGloo 和 NewSetupFuncWithRunAndExtensions。

代码语言:javascript
复制
# projects/gloo/pkg/syncer/setup_syncer.go

func NewSetupFunc() setuputils.SetupFunc {
  return NewSetupFuncWithRunAndExtensions(RunGloo, nil)
}

首先看NewSetupFuncWithRunAndExtensions,其提供了一些默认配置

代码语言:javascript
复制
# projects/gloo/pkg/syncer/setup_syncer.go

func NewSetupFuncWithRunAndExtensions(runFunc RunFunc, extensions *Extensions) setuputils.SetupFunc {
  s := &setupSyncer{
    extensions: extensions,
    makeGrpcServer: func(ctx context.Context) *grpc.Server {
      return grpc.NewServer(grpc.StreamInterceptor(
        //......
      )
    },
    // 真正的业务函数
    runFunc: runFunc,
  }
  return s.Setup
}

这里 s.Setup 负责初始化工作并最终调用 runFunc 函数变量

代码语言:javascript
复制
projects/gloo/pkg/syncer/setup_syncer.go

func (s *setupSyncer) Setup(ctx context.Context, kubeCache kube.SharedCache, memCache memory.InMemoryResourceCache, settings *v1.Settings) error {
  xdsAddr := settings.GetGloo().GetXdsBindAddr()
  if xdsAddr == "" {
    xdsAddr = DefaultXdsBindAddr
  }
  xdsTcpAddress, err := getAddr(xdsAddr)
  if err != nil {
    return errors.Wrapf(err, "parsing xds addr")
  }
  ...
  err = s.runFunc(opts)
  ...
}

runFunc 即 NewSetupFunc 函数中的函数变量RunGloo,其中调用 RunGlooWithExtensions 函数。

代码语言:javascript
复制
# projects/gloo/pkg/syncer/setup_syncer.go

func RunGloo(opts bootstrap.Opts) error {
  return RunGlooWithExtensions(opts, Extensions{})
}

RunGlooWithExtensions 包含了Gloo所有的启动逻辑,首先监听 Upstreams、Proxy、Secrets 等资源并翻译为 Envoy 配置,NewTranslator 构造 Envoy 的配置翻译引擎,Run函数中会监听各种资源并最终触发translationSync 的Sync方法来同步 Envoy 配置,最终会启动 gRPC Server 端与 Envoy 通信。

代码语言:javascript
复制
projects/gloo/pkg/syncer/setup_syncer.go

func RunGlooWithExtensions(opts bootstrap.Opts, extensions Extensions) error {
  ...
  t := translator.NewTranslator(sslutils.NewSslConfigTranslator(), opts.Settings, getPlugins)
  ...
  translationSync := NewTranslatorSyncer(t, opts.ControlPlane.SnapshotCache, xdsHasher, xdsSanitizer, rpt, opts.DevMode, syncerExtensions, opts.Settings)
  syncers := v1.ApiSyncers{
    translationSync,
    validator,
  }
  apiEventLoop := v1.NewApiEventLoop(apiCache, syncers)
  apiEventLoopErrs, err := apiEventLoop.Run(opts.WatchNamespaces, watchOpts)
  if err != nil {
    return err
  }
  ....
  if opts.ControlPlane.StartGrpcServer {
    // copy for the go-routines
    controlPlane := opts.ControlPlane
    lis, err := net.Listen(opts.ControlPlane.BindAddr.Network(), opts.ControlPlane.BindAddr.String())
    if err != nil {
      return err
    }
    go func() {
      <-controlPlane.GrpcService.Ctx.Done()
      controlPlane.GrpcServer.Stop()
    }()

    go func() {
      if err := controlPlane.GrpcServer.Serve(lis); err != nil {
        logger.Errorf("xds grpc server failed to start")
      }
    }()
    opts.ControlPlane.StartGrpcServer = false
  }
  ...

}

Gloo 中的配置渲染逻辑涉及 Envoy 的具体实现,本文只涉及控制平面内容,故这里未作详细说明。

结语

Gloo 就是通过对资源的抽象分离来实现控制平面与数据平面的解耦,本文仅介绍了 Gloo 在 Knative 模式下涉及的源码,梳理了各组件的运行逻辑及互相之间的交互配合。Gloo 是一个新项目,随着项目的开发迭代,后续会推出有关文章,敬请期待!

由于笔者时间、视野、认知有限,本文难免出现错误、疏漏等问题,期待各位读者朋友、业界专家指正交流。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-05-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 DCOS 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档