# yaml 安装
kubectl apply -f https://projectcontour.io/quickstart/contour.yaml
# 验证
kubectl get pod -n projectcontour
# 添加仓库
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install contour --namespace projectcontour bitnami/contour
部署完成后查看安装的资源信息,包括:
$ kubectl get deploy,svc,ds,job -n projectcontour
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/contour 2/2 2 2 12d
NAME TYPE CLUSTER-IP EXTERNAL-IP
service/contour ClusterIP 172.20.126.255 <none> 8001/TCP 12d
service/envoy LoadBalancer 172.20.12.164 ...
NAME DESIRED CURRENT READY UP-TO-DATE AVAILABLE NODE SELECTOR AGE
daemonset.apps/envoy 10 10 10 10 10 <none> 12d
contour 安装完成后,查看 crd 有哪些:
$ kubectl get crd|grep contour
contourconfigurations.projectcontour.io 2021-11-04T13:30:06Z
contourdeployments.projectcontour.io 2021-11-04T13:30:06Z
extensionservices.projectcontour.io 2021-11-04T13:30:06Z
httpproxies.projectcontour.io 2021-11-04T13:30:06Z
tlscertificatedelegations.projectcontour.io 2021-11-04T13:30:06Z
其中最核心的 CRD 是 HTTPProxy,等效于 k8s 提供的原生 Ingress,提供路由配置功能。
看一个简单的 httpproxy 配置示例,当访问 foo1.bar.com 时,将代理到后端 s1 服务的 80 端口
将 foo1.bar.com 域名和任意一个Node节点(daemonset部署)的ip做域名绑定后,即可通过域名访问服务
apiVersion: projectcontour.io/v1
kind: HTTPProxy
metadata:
name: name-example-foo
namespace: default
spec:
virtualhost:
fqdn: foo1.bar.com
routes:
- conditions:
- prefix: /
services:
- name: s1
port: 80
contour 主要包含两个组件:
两个组件是分开部署的,Contour 以 Deployment 的方式部署,Envoy 以 DaemonSet 的方式部署
envoy pod 中包含两个 container
contour bootstrap
命令,生成一个配置文件到临时目录,envoy container共享这个目录,并作为启动的配置文件,里面配置了 contour server 地址,作为 envoy 的管理服务从图中我们还看到其他的一些信息
Ingress 资源在 k8s 1.1 版本引入,从那以后,Ingress API 保持相对稳定,需要使用特殊的能力需要借助于 Annotation 实现。
HttpProxy CRD 的目标是扩展 Ingress API 的能力,为用户提供更丰富的功能体验,以及解决 Ingress 在多租户环境下的限制。
# ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: basic
spec:
rules:
- host: foo-basic.bar.com
http:
paths:
- backend:
service:
name: s1
port:
number: 80
# httpproxy.yaml
apiVersion: projectcontour.io/v1
kind: HTTPProxy
metadata:
name: basic
spec:
virtualhost:
fqdn: foo-basic.bar.com
routes:
- conditions:
- prefix: /
services:
- name: s1
port: 80
通过前面 contour 的基本原理和架构的介绍,总结如下:
实际上在将 k8s 资源转换为最终 xds 资源的过程中,还经过了其他的数据转换。
go-control-plane
这个开源框架k8s 提供了 informer 机制,可以 watch kubernetes资源的变更。contour中 watch 的资源包括两类:
k8s原生资源:
crd资源:
// contour/cmd/contour/serve.go
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
// 这里监听 contour 的 crd 资源, 比如:httpproxy
for _, r := range k8s.DefaultResources() {
inf, err := clients.InformerForResource(r)
......
inf.AddEventHandler(&dynamicHandler)
}
// 监听 ingress 资源
for _, r := range k8s.IngressV1Resources() {
if err := informOnResource(clients, r, &dynamicHandler); err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}
}
......
// 监听 secret 资源
for _, r := range k8s.SecretsResources() {
......
if err := informOnResource(clients, r, handler); err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}
}
// 监听 endpoint 资源
for _, r := range k8s.EndpointsResources() {
if err := informOnResource(clients, r, &k8s.DynamicClientHandler{
Next: &contour.EventRecorder{
Next: endpointHandler,
Counter: contourMetrics.EventHandlerOperations,
},
Converter: converter,
Logger: log.WithField("context", "endpointstranslator"),
}); err != nil {
log.WithError(err).WithField("resource", r).Fatal("failed to create informer")
}
}
......
}
不管是 原生资源,还是 crd 资源,Informer 监听的原理都是一样的,用户只需要编写事件处理函数,对应的接口是 ResourceEventHandler,该接口提供了增加、删除、更新回调函数。
// k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
Contour 中所有的资源复用了两个 ResourceEventHandler 的实现类:
EventHandler 将得到的资源信息,放入channel中,由单独的协程去消费以解耦
func (e *EventHandler) OnAdd(obj interface{}) {
e.update <- opAdd{obj: obj}
}
func (e *EventHandler) OnUpdate(oldObj, newObj interface{}) {
e.update <- opUpdate{oldObj: oldObj, newObj: newObj}
}
func (e *EventHandler) OnDelete(obj interface{}) {
e.update <- opDelete{obj: obj}
}
放入channel的信息,由谁去消费呢?
监听 channel 的协程,拿到事件后,取出资源信息,保存到 KubernetesCache 中。
// contour/cmd/contour/server.go
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
......
// 启动协程
g.Add(eventHandler.Start())
......
}
// counter/internal/contour/handler.go
func (e *EventHandler) Start() func(<-chan struct{}) error {
e.update = make(chan interface{})
return e.run
}
func (e *EventHandler) run(stop <-chan struct{}) error {
......
// 死循环,一直监听 channel 中的事件
for {
select {
case op := <-e.update:
if e.onUpdate(op) {
......
}
......
}
// 处理事件
// 将资源保存到 e.Builder.Source
func (e *EventHandler) onUpdate(op interface{}) bool {
switch op := op.(type) {
case opAdd:
return e.Builder.Source.Insert(op.obj)
case opUpdate:
......
remove := e.Builder.Source.Remove(op.oldObj)
insert := e.Builder.Source.Insert(op.newObj)
return remove || insert
case opDelete:
return e.Builder.Source.Remove(op.obj)
case bool:
return op
default:
return false
}
}
EndpointsTranslator 监听的 endpoint 资源,处理比较麻烦一些,以 add 为例分析
// contour/internal/xdscache/v3/endpointstranslator.go
func (e *EndpointsTranslator) OnAdd(obj interface{}) {
switch obj := obj.(type) {
case *v1.Endpoints:
// 更新本地缓存
e.cache.UpdateEndpoint(obj)
// 合并资源
e.Merge(e.cache.Recalculate())
e.Notify()
if e.Observer != nil {
// 刷新保存快照
e.Observer.Refresh()
}
default:
e.Errorf("OnAdd unexpected type %T: %#v", obj, obj)
}
}
类中定义了多个map类型的资源对象,分别存储不同的资源,key是 namespace 和 name 组成的 type.NamespacedName,将原生k8s中不同 namespace 的同一资源都存储到一个map中,多种资源的多个map,共同组成 KubernetesCache
// contour/internal/dag/cache.go
type KubernetesCache struct {
......
ingresses map[types.NamespacedName]*networking_v1.Ingress
ingressclass *networking_v1.IngressClass
httpproxies map[types.NamespacedName]*contour_api_v1.HTTPProxy
secrets map[types.NamespacedName]*v1.Secret
tlscertificatedelegations map[types.NamespacedName]*contour_api_v1.TLSCertificateDelegation
services map[types.NamespacedName]*v1.Service
namespaces map[string]*v1.Namespace
gatewayclass *gatewayapi_v1alpha1.GatewayClass
gateway *gatewayapi_v1alpha1.Gateway
httproutes map[types.NamespacedName]*gatewayapi_v1alpha1.HTTPRoute
tlsroutes map[types.NamespacedName]*gatewayapi_v1alpha1.TLSRoute
tcproutes map[types.NamespacedName]*gatewayapi_v1alpha1.TCPRoute
udproutes map[types.NamespacedName]*gatewayapi_v1alpha1.UDPRoute
backendpolicies map[types.NamespacedName]*gatewayapi_v1alpha1.BackendPolicy
extensions map[types.NamespacedName]*contour_api_v1alpha1.ExtensionService
......
}
// map中key的数据类型:由 namespace 和 name 共同决定
type NamespacedName struct {
Namespace string
Name string
}
KubernetesCache 插入流程:
// contour/internal/dag/cache.go
func (kc *KubernetesCache) Insert(obj interface{}) bool {
// 初始化 map,确保执行一次
kc.initialize.Do(kc.init)
// 根据不同的资源类型,放入不同的 map
switch obj := obj.(type) {
case *v1.Secret:
......
// 保存 secret 资源
kc.secrets[k8s.NamespacedNameOf(obj)] = obj
return kc.secretTriggersRebuild(obj)
case *v1.Service:
kc.services[k8s.NamespacedNameOf(obj)] = obj
return kc.serviceTriggersRebuild(obj)
case *v1.Namespace:
kc.namespaces[obj.Name] = obj
return true
......
}
return false
}
构造好 KubernetesCache 之后,contour 通过 一些列的 processor,将 KubernetesCache 变成 DAG,DAG 代表了 k8s 不同资源关系的一个有向无环图。核心代码如下:
// contour/internal/contour/handler.go
func (e *EventHandler) rebuildDAG() {
// 将 KubernetesCache 构建成 DAG
latestDAG := e.Builder.Build()
// DAG 变更时,通知所有的 Observer,
// 每种 Observer 是一个 XDS 对象,取出 DAG 资源转换为 XDS 配置
e.Observer.OnChange(latestDAG)
// 更新 snapshot 信息
for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
e.StatusUpdater.Send(upd)
}
}
DAG 数据结构如下,根节点 roots 是一个 Vertex 的列表,Vertex 只定义了Visit接口。即:所有实现了 Visit
// contour/internal/dag/dag.go
type DAG struct {
// StatusCache holds a cache of status updates to send.
StatusCache status.Cache
// roots are the root vertices of this DAG.
roots []Vertex
}
// Vertex is a node in the DAG that can be visited.
type Vertex interface {
Visit(func(Vertex))
}
build过程分析:
// contour/internal/dag/builder.go
func (b *Builder) Build() *DAG {
// 先构造一个空的 DAG
dag := DAG{
StatusCache: status.NewCache(b.Source.ConfiguredGateway),
}
// 遍历所有的 processor,每个 processor 把不同的 k8s 资源变成 DAG 的一部分信息
// 所有的 processor 共同构成完整的 DAG
for _, p := range b.Processors {
p.Run(&dag, &b.Source)
}
return &dag
}
processor 列表包括:
func getDAGBuilder(ctx *serveContext, clients *k8s.Clients, clientCert, fallbackCert *types.NamespacedName, log logrus.FieldLogger) dag.Builder {
dagProcessors := []dag.Processor{
&dag.IngressProcessor{
...
},
&dag.ExtensionServiceProcessor{
...
},
&dag.HTTPProxyProcessor{
...
},
}
if ctx.Config.GatewayConfig != nil && clients.ResourcesExist(k8s.GatewayAPIResources()...) {
dagProcessors = append(dagProcessors, &dag.GatewayAPIProcessor{
FieldLogger: log.WithField("context", "GatewayAPIProcessor"),
})
}
...
dagProcessors = append(dagProcessors, &dag.ListenerProcessor{})
...
}
每个processor 实现 Run 方法,接收 DAG 和 KubernetesCache 参数,通过 KubernetesCache 修改 DAG,以 HTTPProxyProcessor 为例分析。
func (p *HTTPProxyProcessor) Run(dag *DAG, source *KubernetesCache) {
...
// 校验合法性
for _, proxy := range p.validHTTPProxies() {
// 计算路由规则
// 入参为 crd 资源 HttpProxy
// 内部经过各种计算,构造路由关系,最终保存在DAG中
p.computeHTTPProxy(proxy)
}
for meta := range p.orphaned {
proxy, ok := p.source.httpproxies[meta]
if ok {
pa, commit := p.dag.StatusCache.ProxyAccessor(proxy)
pa.ConditionFor(status.ValidCondition).AddError(contour_api_v1.ConditionTypeOrphanedError,
"Orphaned",
"this HTTPProxy is not part of a delegation chain from a root HTTPProxy")
commit()
}
}
}
DAG 中的节点(Vertex)有:
其中某些 Vectex 又包含子 Vectex,因此用图这种数据结构串联起来。
前面分析 DAG 代码时,在生成 DAG 后,会通知所有的 Observer,即e.Observer.OnChange
// contour/internal/contour/handler.go
func (e *EventHandler) rebuildDAG() {
// 将 KubernetesCache 构建成 DAG
latestDAG := e.Builder.Build()
// DAG 变更时,通知所有的 Observer,
// 每种 Observer 是一个 XDS 对象,取出 DAG 资源转换为 XDS 配置
e.Observer.OnChange(latestDAG)
// 更新 snapshot 信息
for _, upd := range latestDAG.StatusCache.GetStatusUpdates() {
e.StatusUpdater.Send(upd)
}
}
这里的 e.Observer 是 一堆 Observer的组合
// contour/cmd/contour/serve.go
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
...
// ResoureCache 列表
resources := []xdscache.ResourceCache{
xdscache_v3.NewListenerCache(listenerConfig, ctx.statsAddr, ctx.statsPort),
&xdscache_v3.SecretCache{},
&xdscache_v3.RouteCache{},
&xdscache_v3.ClusterCache{},
endpointHandler,
}
...
eventHandler := &contour.EventHandler{
...
// 注册一个组合的 Observer
Observer: dag.ComposeObservers(append(xdscache.ObserversOf(resources), snapshotHandler)...),
...
}
}
// 组合多个Observer
func ComposeObservers(observers ...Observer) Observer {
return ObserverFunc(func(d *DAG) {
for _, o := range observers {
o.OnChange(d)
}
})
}
ResourceCache 是一个接口,聚合了两个接口:dag.Observer 和 xds.Resource。该接口定义了如何处理 DAG,以及处理完成后如何返回给调用方。
// contour/internal/xdscache/resources.go
type ResourceCache interface {
dag.Observer
xds.Resource
}
// contour/internal/dag/dag.go
type Observer interface {
// 当 DAG变更时,触发相应业务逻辑
OnChange(*DAG)
}
// contour/internal/xds/resource.go
type Resource interface {
// 返回全部资源内容
// Contents returns the contents of this resource.
Contents() []proto.Message
// 通过给定的名称,返回对应的资源内容
// Query returns an entry for each resource name supplied.
Query(names []string) []proto.Message
// 注册资源
// Register registers ch to receive a value when Notify is called.
Register(chan int, int, ...string)
// 资源类型
// TypeURL returns the typeURL of messages returned from Values.
TypeURL() string
}
不同的 xds 资源类型都实现了ResourceCache 接口:
转换后的 xds 资源都保存在 values
字段中。
// contour/internal/xdscache/v3/cluster.go
// ClusterCache manages the contents of the gRPC CDS cache.
type ClusterCache struct {
mu sync.Mutex
values map[string]*envoy_cluster_v3.Cluster
contour.Cond
}
// contour/internal/xdscache/v3/listener.go
// ListenerCache manages the contents of the gRPC LDS cache.
type ListenerCache struct {
mu sync.Mutex
values map[string]*envoy_listener_v3.Listener
staticValues map[string]*envoy_listener_v3.Listener
Config ListenerConfig
contour.Cond
}
// contour/internal/xdscache/v3/route.go
// RouteCache manages the contents of the gRPC RDS cache.
type RouteCache struct {
mu sync.Mutex
values map[string]*envoy_route_v3.RouteConfiguration
contour.Cond
}
// contour/internal/xdscache/v3/secret.go
// SecretCache manages the contents of the gRPC SDS cache.
type SecretCache struct {
mu sync.Mutex
values map[string]*envoy_tls_v3.Secret
contour.Cond
}
// contour/internal/xdscache/v3/endpointstranslator.go
// A EndpointsTranslator translates Kubernetes Endpoints objects into Envoy
// ClusterLoadAssignment resources.
type EndpointsTranslator struct {
// Observer notifies when the endpoints cache has been updated.
Observer contour.Observer
contour.Cond
logrus.FieldLogger
cache EndpointsCache
mu sync.Mutex // Protects entries.
entries map[string]*envoy_endpoint_v3.ClusterLoadAssignment
}
以 RouteCache 的 OnChange 为例,查看实现逻辑
// contour/internal/xdscache/v3/route.go
func (c *RouteCache) OnChange(root *dag.DAG) {
// 读取 DAG 中的信息,生成 envoy 的 RDS 资源 envoy_route_v3.RouteConfiguration
routes := visitRoutes(root)
// 将配置文件放到 value 字段中,供 Query、Context 等方法调用时查看
c.Update(routes)
}
DAG 转换为 RDS 的逻辑实现
// contour/internal/xdscache/v3/route.go
func visitRoutes(root dag.Vertex) map[string]*envoy_route_v3.RouteConfiguration {
// Collect the route configurations for all the routes we can
// find. For HTTP hosts, the routes will all be collected on the
// well-known ENVOY_HTTP_LISTENER, but for HTTPS hosts, we will
// generate a per-vhost collection. This lets us keep different
// SNI names disjoint when we later configure the listener.
// http 和 https 需要做不同的处理
rv := routeVisitor{
routes: map[string]*envoy_route_v3.RouteConfiguration{
ENVOY_HTTP_LISTENER: envoy_v3.RouteConfiguration(ENVOY_HTTP_LISTENER),
},
}
rv.visit(root)
for _, v := range rv.routes {
sort.Stable(sorter.For(v.VirtualHosts))
}
return rv.routes
}
// contour/internal/xdscache/v3/route.go
// 从根节点开始查找所有的 Route,查找顺序: Listener -> VirtualHost(SecureVirtualHost) -> Route
func (v *routeVisitor) visit(vertex dag.Vertex) {
switch l := vertex.(type) {
// 处理 DAG 中的 Listener
case *dag.Listener:
l.Visit(func(vertex dag.Vertex) {
switch vh := vertex.(type) {
// 处理 VirtualHost
case *dag.VirtualHost:
v.onVirtualHost(vh)
// 处理 SecureVirtualHost
case *dag.SecureVirtualHost:
v.onSecureVirtualHost(vh)
default:
// recurse
vertex.Visit(v.visit)
}
})
default:
// recurse
vertex.Visit(v.visit)
}
}
// 查找 Listener.VirtualHost.Route
func (v *routeVisitor) onVirtualHost(vh *dag.VirtualHost) {
var routes []*dag.Route
vh.Visit(func(v dag.Vertex) {
route, ok := v.(*dag.Route)
if !ok {
return
}
// 将所有的 route 收集到列表中
routes = append(routes, route)
})
if len(routes) == 0 {
return
}
// 定义了把 dag.Route 对象转换为 envoy_route_v3.Route 对象的方法
// 将两种数据格式进行转换
toEnvoyRoute := func(route *dag.Route) *envoy_route_v3.Route {
...
rt := &envoy_route_v3.Route{
Match: envoy_v3.RouteMatch(route),
Action: envoy_v3.RouteRoute(route),
}
...
return rt
}
// 对 route 排序
sortRoutes(routes)
// 这里生成的配置,存储到 routes 这个map中,key 是 ingress_http
v.routes[ENVOY_HTTP_LISTENER].VirtualHosts = append(v.routes[ENVOY_HTTP_LISTENER].VirtualHosts, toEnvoyVirtualHost(vh, routes, toEnvoyRoute))
}
从前面的分析我们知道,数据最终被转换为多个 ResourceCache 的列表,这些数据最终有两个用途:
func doServe(log logrus.FieldLogger, ctx *serveContext) error {
...
// 用于生成快照,这部分比较简单,不做介绍
snapshotHandler := xdscache.NewSnapshotHandler(resources, log.WithField("context", "snapshotHandler"))
...
// 注册服务到 GRPC
// 先通过 NewContourServer 生成一个 GRPC 服务,再注册到 grpcServer
switch ctx.Config.Server.XDSServerType {
// 根据启动参数的不同配置,实例化不同的对象
// serverType = "envoy"
case config.EnvoyServerType:
v3cache := contour_xds_v3.NewSnapshotCache(false, log)
snapshotHandler.AddSnapshotter(v3cache)
// 调用 NewServer 构造对象
// 这个方法是 go-control-plane 内置的方法
contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(taskCtx, v3cache, contour_xds_v3.NewRequestLoggingCallbacks(log)), grpcServer)
// serverType = "contour",这个是默认配置
case config.ContourServerType:
// 调用 NewContourServer 构造对象
contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer)
...
}
下面分析数据经过 grpc 下发的流程,这部分主要是使用 go-control-plane
sdk
注册 ADS、SDS、CDS、EDS、LDS、RDS 等都使用了 srv 对象,可以推断 Server 继承了 各种 XDS的接口
// contour/internal/xds/v3/server.go
// 以下注册方法均调用的 go-control-plane sdk
// 包括注册 ADS、SDS、CDS、EDS、LDS、RDS
func RegisterServer(srv Server, g *grpc.Server) {
// register services
envoy_service_discovery_v3.RegisterAggregatedDiscoveryServiceServer(g, srv)
envoy_service_secret_v3.RegisterSecretDiscoveryServiceServer(g, srv)
envoy_service_cluster_v3.RegisterClusterDiscoveryServiceServer(g, srv)
envoy_service_endpoint_v3.RegisterEndpointDiscoveryServiceServer(g, srv)
envoy_service_listener_v3.RegisterListenerDiscoveryServiceServer(g, srv)
envoy_service_route_v3.RegisterRouteDiscoveryServiceServer(g, srv)
}
// 真正实例化的 Server 对象是 `contourServer`
// 将前面准备好的 ResourceCache 列表传进来
func NewContourServer(log logrus.FieldLogger, resources ...xds.Resource) Server {
c := contourServer{
FieldLogger: log,
resources: map[string]xds.Resource{},
}
// 按照 TypeURL 作为 key,保存资源
for i, r := range resources {
c.resources[r.TypeURL()] = resources[i]
}
return &c
}
// contour/internal/xds/v3/contour.go
type contourServer struct {
// Since we only implement the streaming state of the world
// protocol, embed the default null implementations to handle
// the unimplemented gRPC endpoints.
envoy_service_discovery_v3.UnimplementedAggregatedDiscoveryServiceServer
envoy_service_secret_v3.UnimplementedSecretDiscoveryServiceServer
envoy_service_route_v3.UnimplementedRouteDiscoveryServiceServer
envoy_service_endpoint_v3.UnimplementedEndpointDiscoveryServiceServer
envoy_service_cluster_v3.UnimplementedClusterDiscoveryServiceServer
envoy_service_listener_v3.UnimplementedListenerDiscoveryServiceServer
logrus.FieldLogger
// 所有生成的 Envoy XDS 信息都传进来保存到这个字段
resources map[string]xds.Resource
connections xds.Counter
}
把 ResourceCache 传给 contourServer,contourServer作为GRPC的server注册到GRPC上,go-control-plane 框架封装了各个XDS数据传输的接口,contourServer 实现这些接口,从而实现数据下发
func (s *contourServer) StreamClusters(srv envoy_service_cluster_v3.ClusterDiscoveryService_StreamClustersServer) error {
return s.stream(srv)
}
func (s *contourServer) StreamEndpoints(srv envoy_service_endpoint_v3.EndpointDiscoveryService_StreamEndpointsServer) error {
return s.stream(srv)
}
func (s *contourServer) StreamListeners(srv envoy_service_listener_v3.ListenerDiscoveryService_StreamListenersServer) error {
return s.stream(srv)
}
func (s *contourServer) StreamRoutes(srv envoy_service_route_v3.RouteDiscoveryService_StreamRoutesServer) error {
return s.stream(srv)
}
func (s *contourServer) StreamSecrets(srv envoy_service_secret_v3.SecretDiscoveryService_StreamSecretsServer) error {
return s.stream(srv)
}
所有的 XDS Stream方法内部都调用了 s.stream 方法
func (s *contourServer) stream(st grpcStream) error {
...
// 持续监听连接,并处理请求
// now stick in this loop until the client disconnects.
for {
// 获取 envoy 发送来的请求
req, err := st.Recv()
...
// 根据请求的 TypeURL,找到匹配的 ResourceCache
r, ok := s.resources[req.GetTypeUrl()]
...
// 注册监听器
r.Register(ch, last, req.ResourceNames...)
select {
case last = <-ch:
var resources []proto.Message
switch len(req.ResourceNames) {
// 如果没有指定资源名称,返回所有资源
case 0:
// 调用前面介绍过的 Contents 方法,从 ResourceCache 的 value 中取到 envoy 配置
resources = r.Contents()
// 如果指定资源名称,查找特定资源
default:
// resource hints supplied, return exactly those
resources = r.Query(req.ResourceNames)
}
// 将 ResourceCache 转换成 PB 数据
any := make([]*any.Any, 0, len(resources))
for _, r := range resources {
a, err := anypb.New(proto.MessageV2(r))
if err != nil {
return done(log, err)
}
any = append(any, a)
}
// 构造 grpc response 对象
resp := &envoy_service_discovery_v3.DiscoveryResponse{
VersionInfo: strconv.Itoa(last),
Resources: any,
TypeUrl: req.GetTypeUrl(),
Nonce: strconv.Itoa(last),
}
// 将 response 通过 grpc 发送给客户端 envoy
if err := st.Send(resp); err != nil {
return done(log, err)
}
case <-ctx.Done():
return done(log, ctx.Err())
}
}
}
contour会根据 ctx.Config.Server.XDSServerType 属性的值,初始化不同的 server。
Go-control-plane 提供的 pb 接口包括三类:
XXX 代表不同的 XDS,比如 StreamClusters。
contourServer 只实现了所有 XDS 的 Stream 相关方法,并没有实现 Delta,Fetch 相关方法
前面介绍了 contour 下发数据到 envoy 的流程,那最开始 envoy是如何连接上 contour的?
查看 envoy daemonset yaml文件,可以看到,内部配置了一个 initContainer,用于生成本地配置文件,并和 envoy 业务容器共享,envoy 启动时根据配置文件启动。启动命令中还指定了 envoy 连接的 grpc server 地址,即 contour的地址,这里配置的是 k8s svc 名称
initContainers:
- args:
- bootstrap
- /config/envoy.json
- --xds-address=contour
- --xds-port=8001
- --xds-resource-version=v3
- --resources-dir=/config/resources
- --envoy-cafile=/certs/ca.crt
- --envoy-cert-file=/certs/tls.crt
- --envoy-key-file=/certs/tls.key
进入 envoy 容器可以查看生成的配置内容
kubectl -n projectcontour exec -it envoy-65qfq -c envoy -- cat /config/envoy.json
{
"static_resources": {
"clusters": [
{
"name": "contour",
"alt_stat_name": "projectcontour_contour_8001",
"type": "STRICT_DNS",
"connect_timeout": "5s",
"load_assignment": {
"cluster_name": "contour",
"endpoints": [
{
"lb_endpoints": [
{
"endpoint": {
"address": {
"socket_address": {
"address": "contour",
"port_value": 8001
}
}
}
}
]
}
]
},
"circuit_breakers": {
"thresholds": [
{
"priority": "HIGH",
"max_connections": 100000,
"max_pending_requests": 100000,
"max_requests": 60000000,
"max_retries": 50
},
{
"max_connections": 100000,
"max_pending_requests": 100000,
"max_requests": 60000000,
"max_retries": 50
}
]
},
"typed_extension_protocol_options": {
"envoy.extensions.upstreams.http.v3.HttpProtocolOptions": {
"@type": "type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions",
"explicit_http_config": {
"http2_protocol_options": {}
}
}
},
"transport_socket": {
"name": "envoy.transport_sockets.tls",
"typed_config": {
"@type": "type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext",
"common_tls_context": {
"tls_params": {
"tls_maximum_protocol_version": "TLSv1_3"
},
"tls_certificate_sds_secret_configs": [
{
"name": "contour_xds_tls_certificate",
"sds_config": {
"path": "/config/resources/sds/xds-tls-certificate.json",
"resource_api_version": "V3"
}
}
],
"validation_context_sds_secret_config": {
"name": "contour_xds_tls_validation_context",
"sds_config": {
"path": "/config/resources/sds/xds-validation-context.json",
"resource_api_version": "V3"
}
}
}
}
},
"upstream_connection_options": {
"tcp_keepalive": {
"keepalive_probes": 3,
"keepalive_time": 30,
"keepalive_interval": 5
}
}
},
{
"name": "envoy-admin",
"alt_stat_name": "projectcontour_envoy-admin_9001",
"type": "STATIC",
"connect_timeout": "0.250s",
"load_assignment": {
"cluster_name": "envoy-admin",
"endpoints": [
{
"lb_endpoints": [
{
"endpoint": {
"address": {
"pipe": {
"path": "/admin/admin.sock",
"mode": 420
}
}
}
}
]
}
]
}
}
]
},
"dynamic_resources": {
"lds_config": {
"api_config_source": {
"api_type": "GRPC",
"transport_api_version": "V3",
"grpc_services": [
{
"envoy_grpc": {
"cluster_name": "contour"
}
}
]
},
"resource_api_version": "V3"
},
"cds_config": {
"api_config_source": {
"api_type": "GRPC",
"transport_api_version": "V3",
"grpc_services": [
{
"envoy_grpc": {
"cluster_name": "contour"
}
}
]
},
"resource_api_version": "V3"
}
},
"admin": {
"access_log": [
{
"name": "envoy.access_loggers.file",
"typed_config": {
"@type": "type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog",
"path": "/dev/null"
}
}
],
"address": {
"pipe": {
"path": "/admin/admin.sock",
"mode": 420
}
}
}
}
k8s本身没有内置 ingress 控制器,ingress 控制器呈现百花齐放的状态,参考官方文档:ingress 控制器。
不同的控制器,底层使用的网络代理不同,主要有:nginx、envoy、kong、haproxy等。envoy 作为云原生时代的网络代理,被越来越多的开源项目使用。比如:istio、contour、gloo、。这些项目的本质,都是简化 envoy 的配置,封装更简单的路由规则,通过内部的逻辑转换,变成envoy 的xds配置。
go-control-plane 框架封装了基于grpc下发xds配置,可以方便开发者开发自己的控制面。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。