Gloo基于Envoy实现,致力于成为下一代API网关标杆产品,其在函数级路由等方面表现优异,对旧式应用、微服务和serverless提供支持;与云原生标杆的开源项目(如Envoy、Kubernetes等)紧密集成。Gloo凭借生态支持的多样性、多云架构的灵活性,在云原生网关starups中脱颖而出。
文|董琪
编辑|zouyee
技术深度|适中
Gloo分析
Gloo 的运作由多个独立组件构成,为了方便日后运维和性能优化,有必要了解其具体实现机制。首先,本文会讲解各组件共用的代码部分,即支持动态更新的启动机制。之后再介绍各组件差异性功能的代码实现。
源码结构
Gloo 支持 Gateway,Knative,Ingress 三种模式运行,不同模式所需使用的组件也不同,其中 Knative 模式使用Discovery,EnvoyInit,Gloo,Ingress 四个组件,目录结构如下:
➜ projects git:(master) tree -L 1
.
├── accesslogger
├── clusteringress
├── discovery
├── envoyinit
├── examples
├── gateway
├── gloo
├── hypergloo
├── ingress
├── knative
└── sds
Serverless 中 Gloo 就使用的 Knative 模式,下面将针对 Knative 模式的 Gloo 进行源码讲解。
启动分析
Discovery,Ingress,Gloo 三个组件的启动代码类似,首先会定义处理实际业务逻辑的函数 SetupFunc,这部分各组件不同,之后都会监听 Setting CR(Custome Resource,自定义资源),该 CR 定义了组件的配置信息。当 Setting CR 被修改,SetupFunc 会根据新的 Setting CR 自动重启,达到动态更新配置,而不用重启组件 Pod 的目的。
组件的启动流程如上图所示,下面以 Ingress 组件的启动代码为例做具体分析。
1)main
main 函数作为程序的入口函数,调用 setup.Main
# projects/ingress/cmd/main.go
func main() {
if err := setup.Main(nil); err != nil {
log.Fatalf("err in main: %v", err.Error())
}
}
setup.Main中主要看两点:
1. setuputils.Main 函数,该函数是实现动态配置更新的入口函数
2. Setup 函数变量,该函数是用户自定义业务逻辑的入口函数
# pkg/utils/setuputils/main_setup.go
func Main(customCtx context.Context) error {
return setuputils.Main(setuputils.SetupOpts{
LoggerName: "ingress",
Version: version.Version,
SetupFunc: Setup,
ExitOnError: true,
CustomCtx: customCtx,
})
}
2) NewSetupEventLoop
实际入口 Main 函数中,我们需要关注 Setting 资源的监听和自定义业务逻辑的启动。
# pkg/utils/setuputils/main_setup.go
func Main(opts SetupOpts) error {
//....
emitter := v1.NewSetupEmitter(settingsClient)
settingsRef := core.ResourceRef{Namespace: setupNamespace, Name: setupName}
eventLoop := v1.NewSetupEventLoop(emitter, NewSetupSyncer(settingsRef, opts.SetupFunc))
errs, err := eventLoop.Run([]string{setupNamespace}, clients.WatchOpts{
Ctx: ctx,
RefreshRate: time.Second,
})
//...
return nil
}
NewSetupEmitter 函数以 Setting 客户端为参数,返回 SetupEmitter 接口,该接口嵌套 SetupSnapshotEmitter 接口并实现 Snapshots 方法。Snapshots 方法会监听 Setting 资源,并返回 channel 通道,并确保通道中的 Setting 资源最新。
# projects/gloo/pkg/api/v1/setup_snapshot_emitter.sk.go
type SetupSnapshotEmitter interface {
Snapshots(watchNamespaces []string, opts clients.WatchOpts) (<-chan *SetupSnapshot, <-chan error, error)
}
type SetupEmitter interface {
SetupSnapshotEmitter
Register() error
Settings() SettingsClient
}
func NewSetupEmitter(settingsClient SettingsClient) SetupEmitter {
return NewSetupEmitterWithEmit(settingsClient, make(chan struct{}))
}
func NewSetupEmitterWithEmit(settingsClient SettingsClient, emit <-chan struct{}) SetupEmitter {
return &setupEmitter{
settings: settingsClient,
forceEmit: emit,
}
}
NewSetupEventLoop 负责业务逻辑函数的封装,其第二个参数为 NewSetupSyncer 函数的返回值。NewSetupSyncer 函数将 opts.SetupFunc 函数变量封装在 SetupSyncer结构体中。而 SetupSyncer结构体实现了 Sync 方法(之后 Run 函数中被调用),该方法中s.setupFunc 函数变量就是之前封装的业务逻辑处理函数 opts.SetupFunc。
# pkg/utils/setuputils/setup_syncer.go
func NewSetupSyncer(settingsRef core.ResourceRef, setupFunc SetupFunc) *SetupSyncer {
return &SetupSyncer{
settingsRef: settingsRef,
setupFunc: setupFunc,
inMemoryCache: memory.NewInMemoryResourceCache(),
}
}
func (s *SetupSyncer) Sync(ctx context.Context, snap *v1.SetupSnapshot) error {
settings, err := snap.Settings.Find(s.settingsRef.Strings())
//...
return s.setupFunc(ctx, kube.NewKubeCache(ctx), s.inMemoryCache, settings)
}
3) Run 函数
Run 函数用于启动之前的监听逻辑和组件自定义的业务逻辑。Snapshots 会监听 Setting 资源并将最新的资源通过 channel watch 返回;goroutine 中获取到最新的 snapshot 后调用 Sync 函数来启动组件自定义的业务逻辑。
# projects/gloo/pkg/api/v1/setup_event_loop.sk.go
func (el *setupEventLoop) Run(namespaces []string, opts clients.WatchOpts) (<-chan error, error) {
//...
watch, emitterErrs, err := el.emitter.Snapshots(namespaces, opts)
go func() {
//...
for {
select {
case snapshot, ok := <-watch:
//...
ctx, canc := context.WithCancel(ctx)
cancel = canc
err := el.syncer.Sync(ctx, snapshot)
//...
case <-opts.Ctx.Done():
return
}
}
}()
return errs, nil
}
4) Snapshots 函数
el.emitter.Snapshots 函数首先通过 List 函数获取 Setting 资源作为初始化列表,并通过Watch函数监听资源获取最新列表,之后通过 Hash 函数对比判断资源是否发生变化,如果发生变化则通过 channel 通道返回最新的 Setting 资源,否则继续监听。
# projects/gloo/pkg/api/v1/setup_snapshot_emitter.sk.go
func (c *setupEmitter) Snapshots(watchNamespaces []string, opts clients.WatchOpts) (<-chan *SetupSnapshot, <-chan error, error) {
//......
for _, namespace := range watchNamespaces {
// c.settings.List 列出所有 namespace 下的 Setting 资源
{
settings, err := c.settings.List(namespace, clients.ListOpts{Ctx: opts.Ctx, Selector: opts.Selector})
if err != nil {
return nil, nil, errors.Wrapf(err, "initial Settings list")
}
initialSettingsList = append(initialSettingsList, settings...)
}
// c.settings.Watch 监听 Setting 资源的变化,返回最新资源
settingsNamespacesChan, settingsErrs, err := c.settings.Watch(namespace, opts)
if err != nil {
return nil, nil, errors.Wrapf(err, "starting Settings watch")
}
// 对新的资源进行封装
go func(namespace string) {
for {
select {
case <-ctx.Done():
return
case settingsList := <-settingsNamespacesChan:
select {
case <-ctx.Done():
return
case settingsChan <- settingsListWithNamespace{list: settingsList, namespace: namespace}:
}
}
}
}(namespace)
}
// 对最开始 List 返回的 Setting 资源排序,用于之后的 Hash 比较
currentSnapshot.Settings = initialSettingsList.Sort()
snapshots := make(chan *SetupSnapshot)
go func() {
// sent initial snapshot to kick off the watch
initialSnapshot := currentSnapshot.Clone()
snapshots <- &initialSnapshot
timer := time.NewTicker(time.Second * 1)
previousHash, err := currentSnapshot.Hash(nil)
// 定义 sync 函数变量,对比 currentHash 和 previousHash Hash 值
// 相同则直接返回,不同的话会将当前最新资源发送到 snapshots 通道中
// snapshots 通道作为 Snapshots 方法的返回值,传递给调用者
sync := func() {
currentHash, err := currentSnapshot.Hash(nil)
// 第一次运行时,currentHash 和 previousHash 相同,直接返回
if previousHash == currentHash {
return
}
sentSnapshot := currentSnapshot.Clone()
select {
case snapshots <- &sentSnapshot:
previousHash = currentHash
}
settingsByNamespace := make(map[string]SettingsList)
for {
select {
// 定时调用 sync 函数确保将已变化的资源返回给 Snapshots 方法的调用者。
...
case settingsNamespacedList := <-settingsChan:
namespace := settingsNamespacedList.namespace
// merge lists by namespace
settingsByNamespace[namespace] = settingsNamespacedList.list
var settingsList SettingsList
for _, settings := range settingsByNamespace {
settingsList = append(settingsList, settings...)
}
// 从 settingsChan 通道中获取最新的资源,并赋值给 currentSnapshot 结构体
// 之后将在 sync 环境变量中计算 currentHash 并于 previsouHahs 比较
currentSnapshot.Settings = settingsList.Sort()
}
}
}()
return snapshots, errs, nil
}
5) Sync 函数
el.syncer.Sync 函数中会调用实际的业务逻辑函数,s.setupFunc 函数。该函数变量是在 NewSetupEventLoop 是进行赋值的。
# pkg/utils/setuputils/setup_syncer.go
func (s *SetupSyncer) Sync(ctx context.Context, snap *v1.SetupSnapshot) error {
settings, err := snap.Settings.Find(s.settingsRef.Strings())
//...
return s.setupFunc(ctx, kube.NewKubeCache(ctx), s.inMemoryCache, settings)
}
同时 el.syncer.Sync 函数运行在 Run 函数中的goroutine 中,当 Setting资源发生变化时,就会触发 el.syncer.Sync 函数的重新调用,从而达到动态更新配置,而不用重启组件 Pod 的目的。
# projects/gloo/pkg/api/v1/setup_event_loop.sk.go
func (el *setupEventLoop) Run(namespaces []string, opts clients.WatchOpts) (<-chan error, error) {
//.....
watch, emitterErrs, err := el.emitter.Snapshots(namespaces, opts)
go func() {
//.....
for {
select {
case snapshot, ok := <-watch:
err := el.syncer.Sync(ctx, snapshot)
//.....
}
}()
return errs, nil
}
至此,组件的启动代码分析完毕,下面将介绍 SetupFunc 函数,即各组件业务逻辑的部分。
ingress组件
Ingress 组件的业务逻辑是监听 Knative 环境中的 KIngress 资源,并根据 KIngress 资源创建维护 Proxy 资源。此外,成功创建或更新 Proxy 资源后 Ingress 组件会更新 KIngress 资源的 status 状态,以此告知 Knative 的 Controller 组件该 KIngress 路由信息已经成功配置,Knative 获取该状态后会逐层上报至 Route,KSVC 等资源。
# projects/ingress/pkg/setup/setup_syncer.go
func Setup(ctx context.Context, kubeCache kube.SharedCache, inMemoryCache memory.InMemoryResourceCache, settings *gloov1.Settings) error {
//.....
return RunIngress(opts)
}
RunIngress 函数中 knativeTranslatorSync 结构体变量非常重要,其自带函数变量 translateProxy用于生成 Proxy 资源,后面将详细介绍。RunIngress 函数最终会调用 Run 函数。
# projects/ingress/pkg/setup/setup_syncer.go
func RunIngress(opts Opts) error {
...
if opts.EnableKnative {
// 兼容旧版 Knative
if pre080knativeVersion(opts.KnativeVersion) {
...
} else {
...
knativeTranslatorEmitter := knativev1.NewTranslatorEmitter(ingressClient)
knativeTranslatorSync := knativetranslator.NewSyncer(
...
)
knativeTranslatorEventLoop := knativev1.NewTranslatorEventLoop(knativeTranslatorEmitter, knativeTranslatorSync)
knativeTranslatorEventLoopErrs, err := knativeTranslatorEventLoop.Run(opts.WatchNamespaces, opts.WatchOpts)
...
}
}
...
}
Run 函数内部逻辑与启动代码类似,只不过这里的 Run 函数实现的是 EventLoop 接口。Snapshots 函数这次监听处理的不再是 Setting 资源,而是Ingress资源(ingresses.networking.internal.knative.dev,区别于 K8s中的 Ingress),其余逻辑相同。
# projects/knative/pkg/api/v1/translator_event_loop.sk.go
func (el *translatorEventLoop) Run(namespaces []string, opts clients.WatchOpts) (<-chan error, error) {
...
watch, emitterErrs, err := el.emitter.Snapshots(namespaces, opts)
...
select {
case snapshot, ok := <-watch:
err := el.syncer.Sync(ctx, snapshot)
...
}
这里的 el.syncer.Sync 实现的是 TranslatorSyncer接口,且内部逻辑与启动代码中的也不同。具体如下:
下面将对每个步骤做具体分析。
# projects/knative/pkg/translator/translator_syncer.go
func (s *translatorSyncer) Sync(ctx context.Context, snap *v1.TranslatorSnapshot) error {
...
externalProxy, err := s.translateProxy(ctx, externalProxyName, s.writeNamespace, externalIngresses)
...
if err := s.proxyReconciler.Reconcile(s.writeNamespace, desiredResources, utils.TransitionFunction, clients.ListOpts{
Ctx: ctx,
Selector: labels,
}); err != nil {
return err
}
g := &errgroup.Group{}
g.Go(func() error {
if err := s.propagateProxyStatus(ctx, externalProxy, externalIngresses); err != nil {
return eris.Wrapf(err, "failed to propagate external proxy status "+
"to ingress objects")
}
return nil
})
//......
}
translateProxy 函数即是之前 RunIngress 函数中 knativeTranslatorSync 结构体变量中的函数变量。可以看出其通过 routingConfig 函数处理 Ingress 资源并最终构建 Proxy 资源。逻辑比较简单,不做详细介绍。
# projects/knative/pkg/translator/translate.go
func translateProxy(ctx context.Context, proxyName, proxyNamespace string, ingresses v1alpha1.IngressList) (*gloov1.Proxy, error) {
ingressSpecsByRef := make(map[*core.Metadata]knativev1alpha1.IngressSpec)
for _, ing := range ingresses {
meta := ing.GetMetadata()
ingressSpecsByRef[&meta] = ing.Spec
}
return TranslateProxyFromSpecs(ctx, proxyName, proxyNamespace, ingressSpecsByRef)
}
...
// made public to be shared with the (soon to be deprecated) clusteringress controller
func TranslateProxyFromSpecs(ctx context.Context, proxyName, proxyNamespace string, ingresses map[core.Metadata]knativev1alpha1.IngressSpec) (gloov1.Proxy, error) {
// 处理 Ingress 资源
virtualHostsHttp, virtualHostsHttps, sslConfigs, err := routingConfig(ctx, ingresses)
...
// 构造 Proxy 资源
return &gloov1.Proxy{
Metadata: core.Metadata{
Name: proxyName, // must match envoy role
Namespace: proxyNamespace,
},
Listeners: listeners,
}, nil
}
Reconcile 将构建出的 Proxy 资源同步到环境中去。可以看出 Reconcile函数的具体实现在 r.base.Reconcile 中。
# projects/gloo/pkg/api/v1/proxy_reconciler.sk.go
func (r *proxyReconciler) Reconcile(namespace string, desiredResources ProxyList, transition TransitionProxyFunc, opts clients.ListOpts) error {
//......
return r.base.Reconcile(namespace, proxysToResources(desiredResources), transitionResources, opts)
}
r.base.Reconcile
是第三方 github.com/solo-io/solo-kit/pkg/api/v1/reconcile
库中的函数。
func (r *reconciler) Reconcile(namespace string, desiredResources resources.ResourceList, transition TransitionResourcesFunc, opts clients.ListOpts) error {
opts = opts.WithDefaults()
opts.Ctx = contextutils.WithLogger(opts.Ctx, "reconciler")
originalResources, err := r.rc.List(namespace, opts)
if err != nil {
return err
}
for _, desired := range desiredResources {
// 创建或更新资源
if err := r.syncResource(opts.Ctx, desired, originalResources, transition); err != nil {
return errors.Wrapf(err, "reconciling resource %v", desired.GetMetadata().Name)
}
}
// 删除资源
for _, original := range originalResources {
unused := findResource(original.GetMetadata().Namespace, original.GetMetadata().Name, desiredResources) == nil
if unused {
if err := deleteStaleResource(opts.Ctx, r.rc, original); err != nil {
return errors.Wrapf(err, "deleting stale resource %v", original.GetMetadata().Name)
}
}
}
return nil
}
其中 r.rc.List 通过 opts 中特定的 labels 去获取当前环境中的 Proxy 资源,labels 即之前 Sync 方法中定义的map结构体,之后 syncResource 方法最终会调用 ResourceClient 的 Write 方法负责新建或更新资源,deleteStaleResource 会调用 ResourceClient 的 Delete 方法负责删除资源。调用链如下图所示。
propagateProxyStatus 函数用于更新 Ingress status 状态。当 Proxy 资源被 Gloo 组件成功获取后,Gloo 组件会修改 Proxy 资源的 status 状态,此时会调用 markIngressesReady 函数。markIngressesReady 最终会调用 UpdateStatus 函数去更新 Ingress 资源的 status 状态。
# projects/knative/pkg/translator/translator_syncer.go
//propagate to all ingresses the status of the proxy
func (s *translatorSyncer) propagateProxyStatus(ctx context.Context, proxy *gloov1.Proxy, ingresses v1alpha1.IngressList) error {
...
for {
select {
...
case <-ticker:
....
switch updatedProxy.Status.State {
case core.Status_Pending:
continue
case core.Status_Rejected:
contextutils.LoggerFrom(ctx).Errorf("proxy was rejected by gloo: %v",
updatedProxy.Status.Reason)
return nil
case core.Status_Accepted:
return s.markIngressesReady(ctx, ingresses)
}
}
}
}
func (s *translatorSyncer) markIngressesReady(ctx context.Context, ingresses v1alpha1.IngressList) error {
...
for _, ingress := range updatedIngresses {
// 更新 Ingress 资源的 status 状态
if _, err := s.ingressClient.Ingresses(ingress.Namespace).UpdateStatus(ingress); err != nil {
contextutils.LoggerFrom(ctx).Errorf("failed to update Ingress %v status with error %v", ingress.Name, err)
}
}
return nil
}
Discovery组件
Discovery 组件将监听 Kubernetes 环境中的 Service 资源,当有新 Service 被创建时,Discovery 组件会创建并维护 Upstreams 资源。RunUDS 函数是 Discovery 组件业务逻辑处理的入口函数,主要看其中的 StartUds 函数,Run 函数与 Ingress 中的逻辑类似,这里就不再赘述。
# projects/discovery/pkg/uds/syncer/setup_syncer.go
func RunUDS(opts bootstrap.Opts) error {
...
uds := discovery.NewUpstreamDiscovery(watchNamespaces, opts.WriteNamespace, upstreamClient, discoveryPlugins)
udsErrs, err := uds.StartUds(watchOpts, discovery.Opts{})
...
eventLoop := v1.NewDiscoveryEventLoop(emitter, sync)
eventLoopErrs, err := eventLoop.Run(opts.WatchNamespaces, watchOpts)
...
}
StartUds 函数会监听 Kubernetes Service 资源并将其转化为 Upstreams 资源,之后修改其 labels 标签,最后通过 Resync 函数将 Upstreams 资源同步到环境中。
# projects/gloo/pkg/discovery/discovery.go
func (d *UpstreamDiscovery) StartUds(opts clients.WatchOpts, discOpts Opts) (chan error, error) {
aggregatedErrs := make(chan error)
d.extraSelectorLabels = opts.Selector
for _, uds := range d.discoveryPlugins {
// 获取集群中的 Kubernetes Serivce 资源并将其转换为 Upstreams 资源
upstreams, errs, err := uds.DiscoverUpstreams(d.watchNamespaces, d.writeNamespace, opts, discOpts)
...
go func(uds DiscoveryPlugin) {
udsName := strings.Replace(reflect.TypeOf(uds).String(), "*", "", -1)
udsName = strings.Replace(udsName, ".", "", -1)
for {
select {
case upstreamList := <-upstreams:
d.lock.Lock()
// 修改 labels 标签
upstreamList = setLabels(udsName, upstreamList)
d.latestDesiredUpstreams[uds] = upstreamList
d.lock.Unlock()
// 将构建的最新 Upstreams 资源同步到环境中
if err := d.Resync(opts.Ctx); err != nil {
aggregatedErrs <- errors.Wrapf(err, "error in uds plugin %v", reflect.TypeOf(uds).Name())
}
//......
}
}(uds)
}
return aggregatedErrs, nil
}
DiscoveryUpstreams 函数中会启动一个 goroutine,当资源变化时会触发 discoverUpstreams 函数,discoverUpstreams 中首先通过 List 函数获取各个 Namespace 下的 Kubernetes Service 资源,之后通过 ConvertServices 函数将 Service 资源转化为 Upstreams 资源。
# projects/gloo/pkg/plugins/kubernetes/uds.go
func (p *plugin) DiscoverUpstreams(watchNamespaces []string, writeNamespace string, opts clients.WatchOpts, discOpts discovery.Opts) (chan v1.UpstreamList, chan error, error) {
//...
discoverUpstreams := func() {
var serviceList []*kubev1.Service
for _, ns := range watchNamespaces {
// 获取 Kubernetes Service 资源
services, err := p.kubeCoreCache.NamespacedServiceLister(ns).List(labels.SelectorFromSet(opts.Selector))
if err != nil {
errs <- err
return
}
serviceList = append(serviceList, services...)
}
// 转化为 Upstreams 资源
upstreams := p.ConvertServices(ctx, watchNamespaces, serviceList, discOpts, writeNamespace)
logger.Debugw("discovered services", "num", len(upstreams))
upstreamsChan <- upstreams
}
go func() {
//......
discoverUpstreams()
for {
select {
case _, ok := <-watch:
if !ok {
return
}
discoverUpstreams()
case <-ctx.Done():
return
}
}
}()
return upstreamsChan, errs, nil
}
Resync 函数会调用 Reconcile 将新的构建的 Upstreams 资源通过创建,更新,删除等方法同步到环境中。Reconcile 的具体实现可以参照之前 Ingress 章节中讲解的 Reconcile 函数,这里不再赘述。
# projects/gloo/pkg/discovery/discovery.go
func (d *UpstreamDiscovery) Resync(ctx context.Context) error {
...
for uds, desiredUpstreams := range d.latestDesiredUpstreams {
...
if err := d.upstreamReconciler.Reconcile(d.writeNamespace, desiredUpstreams, uds.UpdateUpstream, clients.ListOpts{
Ctx: ctx,
Selector: selector,
})
...
}
return nil
}
EnvoyInit组件
EnvoyInit 组件的功能相对简单,其只负责启动 envoy 代理。
main 函数为入口函数,逻辑比较简单,首先获取 envoy 代理的初始化配置,之后启动 envoy 代理。
# projects/envoyinit/cmd/main.go
func main() {
// 获取配置信息
outCfg, err := getConfig()
...
env := os.Environ()
args := []string{envoy(), "--config-yaml", outCfg}
if len(os.Args) > 1 {
args = append(args, os.Args[1:]...)
}
// 启动 envoy 代理的二进制文件
if err := syscall.Exec(args[0], args, env); err != nil {
panic(err)
}
}
getConfig 函数会返回 envoy 的最终配置信息,首先inputCfg 会返回原始配置文件的路径,之后根据路径读取文件内容,最终通过 Transform渲染出最终配置。
# projects/envoyinit/cmd/main.go
func getConfig() (string, error) {
inputfile := inputCfg()
inreader, err := os.Open(inputfile)
if err != nil {
return "", err
}
defer inreader.Close()
var buffer bytes.Buffer
transformer := downward.NewTransformer()
err = transformer.Transform(inreader, &buffer)
if err != nil {
return "", err
}
return buffer.String(), nil
}
其中,原始配置文件是通过 configmap knative-external-proxy-config
挂载到 EnvoyInit 的 Pod 中,其中包括 Go Template 模板,但在整个流程中并没有发现模板的渲染逻辑,这是因为模板渲染函数在 NewTransformer
中就确定了。NewTransformer
中 TransformConfigTemplates
函数变量即用于渲染配置文件,而渲染最终会通过 interpolate 函数变量来实现。
# pkg/mod/github.com/solo-io/envoy-operator@v0.1.1/pkg/downward/transform.go
func NewTransformer() *Transformer {
return &Transformer{
transformations: []func(*envoy_config_v2.Bootstrap) error{TransformConfigTemplates},
}
}
func TransformConfigTemplates(bootstrapConfig *envoy_config_v2.Bootstrap) error {
api := RetrieveDownwardAPI()
return TransformConfigTemplatesWithApi(bootstrapConfig, api)
}
func TransformConfigTemplatesWithApi(bootstrapConfig *envoy_config_v2.Bootstrap, api DownwardAPI) error {
interpolate := func(s *string) error { return interpolator.InterpolateString(s, api) }
...
}
interpolator.InterpolateString 函数调用 Interpolate,而 Interpolate 函数中可以明显看到 Go Template 的渲染逻辑。
# pkg/mod/github.com/solo-io/envoy-operator@v0.1.1/pkg/downward/template.go
func (*interpolator) Interpolate(tmpl string, out io.Writer, data DownwardAPI) error {
t, err := template.New("template").Option("missingkey=zero").Parse(tmpl)
...
err = t.Execute(out, data)
...
}
func (i *interpolator) InterpolateString(tmpl *string, data DownwardAPI) error {
var b bytes.Buffer
err := i.Interpolate(*tmpl, &b, data)
//......
}
Gloo组件
Gloo 遵循基于事件的体系结构,通过监视各种配置源进而获取 Proxy,Upstreams,Secrets,AuthConfigs 等相关资源并通过转换成对应的 Envoy 配置,同时启动 gRPC Server 端,等待 Envoy 与之通信。
NewSetupFunc函数为 Gloo 组件业务逻辑处理的入口函数,这里要关注两个函数 RunGloo 和 NewSetupFuncWithRunAndExtensions。
# projects/gloo/pkg/syncer/setup_syncer.go
func NewSetupFunc() setuputils.SetupFunc {
return NewSetupFuncWithRunAndExtensions(RunGloo, nil)
}
首先看NewSetupFuncWithRunAndExtensions,其提供了一些默认配置
# projects/gloo/pkg/syncer/setup_syncer.go
func NewSetupFuncWithRunAndExtensions(runFunc RunFunc, extensions *Extensions) setuputils.SetupFunc {
s := &setupSyncer{
extensions: extensions,
makeGrpcServer: func(ctx context.Context) *grpc.Server {
return grpc.NewServer(grpc.StreamInterceptor(
//......
)
},
// 真正的业务函数
runFunc: runFunc,
}
return s.Setup
}
这里 s.Setup 负责初始化工作并最终调用 runFunc 函数变量
projects/gloo/pkg/syncer/setup_syncer.go
func (s *setupSyncer) Setup(ctx context.Context, kubeCache kube.SharedCache, memCache memory.InMemoryResourceCache, settings *v1.Settings) error {
xdsAddr := settings.GetGloo().GetXdsBindAddr()
if xdsAddr == "" {
xdsAddr = DefaultXdsBindAddr
}
xdsTcpAddress, err := getAddr(xdsAddr)
if err != nil {
return errors.Wrapf(err, "parsing xds addr")
}
...
err = s.runFunc(opts)
...
}
runFunc 即 NewSetupFunc 函数中的函数变量RunGloo,其中调用 RunGlooWithExtensions 函数。
# projects/gloo/pkg/syncer/setup_syncer.go
func RunGloo(opts bootstrap.Opts) error {
return RunGlooWithExtensions(opts, Extensions{})
}
RunGlooWithExtensions 包含了Gloo所有的启动逻辑,首先监听 Upstreams、Proxy、Secrets 等资源并翻译为 Envoy 配置,NewTranslator 构造 Envoy 的配置翻译引擎,Run函数中会监听各种资源并最终触发translationSync 的Sync方法来同步 Envoy 配置,最终会启动 gRPC Server 端与 Envoy 通信。
projects/gloo/pkg/syncer/setup_syncer.go
func RunGlooWithExtensions(opts bootstrap.Opts, extensions Extensions) error {
...
t := translator.NewTranslator(sslutils.NewSslConfigTranslator(), opts.Settings, getPlugins)
...
translationSync := NewTranslatorSyncer(t, opts.ControlPlane.SnapshotCache, xdsHasher, xdsSanitizer, rpt, opts.DevMode, syncerExtensions, opts.Settings)
syncers := v1.ApiSyncers{
translationSync,
validator,
}
apiEventLoop := v1.NewApiEventLoop(apiCache, syncers)
apiEventLoopErrs, err := apiEventLoop.Run(opts.WatchNamespaces, watchOpts)
if err != nil {
return err
}
....
if opts.ControlPlane.StartGrpcServer {
// copy for the go-routines
controlPlane := opts.ControlPlane
lis, err := net.Listen(opts.ControlPlane.BindAddr.Network(), opts.ControlPlane.BindAddr.String())
if err != nil {
return err
}
go func() {
<-controlPlane.GrpcService.Ctx.Done()
controlPlane.GrpcServer.Stop()
}()
go func() {
if err := controlPlane.GrpcServer.Serve(lis); err != nil {
logger.Errorf("xds grpc server failed to start")
}
}()
opts.ControlPlane.StartGrpcServer = false
}
...
}
Gloo 中的配置渲染逻辑涉及 Envoy 的具体实现,本文只涉及控制平面内容,故这里未作详细说明。
结语
Gloo 就是通过对资源的抽象分离来实现控制平面与数据平面的解耦,本文仅介绍了 Gloo 在 Knative 模式下涉及的源码,梳理了各组件的运行逻辑及互相之间的交互配合。Gloo 是一个新项目,随着项目的开发迭代,后续会推出有关文章,敬请期待!
由于笔者时间、视野、认知有限,本文难免出现错误、疏漏等问题,期待各位读者朋友、业界专家指正交流。