前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >istio的数据存储和事件处理

istio的数据存储和事件处理

作者头像
有点技术
发布2021-01-08 15:12:30
6750
发布2021-01-08 15:12:30
举报
文章被收录于专栏:有点技术有点技术

数据对象

ConfigStore

ConfigStore描述了基础平台必须支持的一组平台无关的API,以存储和检索Istio配置。配置键定义为配置对象的类型,名称和命名空间的组合。保证配置密钥在存储中是唯一的。此处显示的存储接口假定基础存储层支持_Get_(列表),_Update_(更新),_Create_(创建)和_Delete_语义,但不保证任何事务语义。_Update_,_ Create_,和_Delete_是变量操作。这些操作是异步的,您可能不会立即看到效果(例如,在对存储进行更改后,_Get_可能不会立即通过键返回对象。)即使操作成功,也可能会出现间歇性错误,因此您应始终检查对象存储是否已被修改即使变异操作返回错误。应该使用_Create_操作创建对象并使用_Update_操作更新对象。资源版本记录每个对象上的最后一个变异操作。如果将变异应用于对象的修订版本与纯等式定义的基础存储所期望的版本不同,则操作将被阻止。此接口的客户端不应假设版本标识符的结构或顺序。从此接口提供和返回的对象引用应视为只读。修改它们会违反线程安全性。

ConfigStoreCache

ConfigStoreCache是配置存储的本地完全复制的缓存。缓存主动将其本地状态与远程存储同步,并提供通知机制以接收更新事件。这样,通知处理程序必须在调用_Run_之前注册,并且缓存在调用_Run_之后需要初始同步宽限期。

更新通知要求以下一致性保证:通知到达时,缓存中的视图必须至少是最新的,但是可能更新鲜(例如_Delete_取消_Add_事件)。

处理程序按照附加的顺序在单个工作程序队列上执行。处理程序接收通知事件和关联的对象。请注意,在启动缓存控制器之前,必须注册所有处理程序。

ConfigStoreCache 相较于ConfigStore多了三个方法

代码语言:javascript
复制
type ConfigStoreCache interface {    ConfigStore    // RegisterEventHandler 为指定的类型的更新时间添加处理器    RegisterEventHandler(kind config.GroupVersionKind, handler func(config.Config, config.Config, Event))    // 运行    Run(stop <-chan struct{})    // 初始高速缓存同步完成后,HasSynced返回true    HasSynced() bool}

handler

当Config发生变化,处理event事件

代码语言:javascript
复制
handler func(config.Config, config.Config, Event)

ConfigStoreCache 类型

memory

istio中有两处用到了memory ConfigStore

•监听registry配置文件变化,写入到mem store

初始化一个mapstore := memory.Make(collections.Pilot)构造为ConfigStoreCacheconfigController := memory.NewController(store)监听文件变化来处理事件err := s.makeFileMonitor(args.RegistryOptions.FileDir, args.RegistryOptions.KubeOptions.DomainSuffix, configController)if err != nil { return err}s.ConfigStores = append(s.ConfigStores, configController)

•监听xds数据变化,写入到mem store

代码语言:javascript
复制
xdsMCP, err := adsc.New(srcAddress.Host, &adsc.Config{    Meta: model.NodeMetadata{        Generator: "api",    }.ToStruct(),    InitialDiscoveryRequests: adsc.ConfigInitialRequests(),})if err != nil {    return fmt.Errorf("failed to dial XDS %s %v", configSource.Address, err)}store := memory.Make(collections.Pilot)configController := memory.NewController(store)xdsMCP.Store = model.MakeIstioStore(configController)err = xdsMCP.Run()if err != nil {    return fmt.Errorf("MCP: failed running %v", err)}s.ConfigStores = append(s.ConfigStores, configController)log.Warn("Started XDS config ", s.ConfigStores)

ingress

监听k8s ingress资源变化

代码语言:javascript
复制
ingress.NewController(s.kubeClient, s.environment.Watcher, args.RegistryOptions.KubeOptions)

Kubernetes

监听k8s资源变化

代码语言:javascript
复制
// 创建一个crd监听资源变化configController, err := s.makeKubeConfigController(args)if err != nil {    return err}s.ConfigStores = append(s.ConfigStores, configController)// 如果启用了service-api 则需要获取service-api资源if features.EnableServiceApis {    s.ConfigStores = append(s.ConfigStores, gateway.NewController(s.kubeClient, configController, args.RegistryOptions.KubeOptions))}

configaggregate

将所有的 []model.ConfigStoreCache转化为一个,从而统一管理,注册对应的EventHandler

代码语言:javascript
复制
configaggregate.MakeCache(s.ConfigStores)

eventhandler

configHandler

用来监听以下crd obj的事件变化

代码语言:javascript
复制
Pilot = collection.NewSchemasBuilder().        MustAdd(IstioNetworkingV1Alpha3Destinationrules).        MustAdd(IstioNetworkingV1Alpha3Envoyfilters).        MustAdd(IstioNetworkingV1Alpha3Gateways).        MustAdd(IstioNetworkingV1Alpha3Serviceentries).        MustAdd(IstioNetworkingV1Alpha3Sidecars).        MustAdd(IstioNetworkingV1Alpha3Virtualservices).        MustAdd(IstioNetworkingV1Alpha3Workloadentries).        MustAdd(IstioNetworkingV1Alpha3Workloadgroups).        MustAdd(IstioSecurityV1Beta1Authorizationpolicies).        MustAdd(IstioSecurityV1Beta1Peerauthentications).        MustAdd(IstioSecurityV1Beta1Requestauthentications).        Build()

confighandler的具体实现

代码语言:javascript
复制
configHandler := func(_, curr config.Config, event model.Event) {    pushReq := &model.PushRequest{        Full: true,        ConfigsUpdated: map[model.ConfigKey]struct{}{{            Kind:      curr.GroupVersionKind,            Name:      curr.Name,            Namespace: curr.Namespace,        }: {}},        Reason: []model.TriggerReason{model.ConfigUpdate},    }    s.XDSServer.ConfigUpdate(pushReq)    if event != model.EventDelete {        s.statusReporter.AddInProgressResource(curr)    } else {        s.statusReporter.DeleteInProgressResource(curr)    }}

workloadentryhandler

WorkloadEntryHandler定义WorkloadEntry的handler

代码语言:javascript
复制
func (s *ServiceEntryStore) workloadEntryHandler(old, curr config.Config, event model.Event) {    var oldWle *networking.WorkloadEntry    if old.Spec != nil {        oldWle = old.Spec.(*networking.WorkloadEntry)    }    wle := curr.Spec.(*networking.WorkloadEntry)    key := configKey{        kind:      workloadEntryConfigType,        name:      curr.Name,        namespace: curr.Namespace,    }    if features.WorkloadEntryHealthChecks && !isHealthy(curr) {        event = model.EventDelete    }    // fire off the k8s handlers    if len(s.workloadHandlers) > 0 {        si := convertWorkloadEntryToWorkloadInstance(curr)        if si != nil {            for _, h := range s.workloadHandlers {                h(si, event)            }        }    }    s.storeMutex.RLock()    // 获取同命名空间的entries    entries := s.seWithSelectorByNamespace[curr.Namespace]    s.storeMutex.RUnlock()    // if there are no service entries, return now to avoid taking unnecessary locks    if len(entries) == 0 {        return    }    log.Debugf("Handle event %s for workload entry %s in namespace %s", event, curr.Name, curr.Namespace)    instancesUpdated := []*model.ServiceInstance{}    instancesDeleted := []*model.ServiceInstance{}    workloadLabels := labels.Collection{wle.Labels}    fullPush := false    configsUpdated := map[model.ConfigKey]struct{}{}    for _, se := range entries {        selected := false        // workloadentry不满足serviceentry label selector        if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) {            // 更新操作            if oldWle != nil {                oldWorkloadLabels := labels.Collection{oldWle.Labels}                // 如果老的workloadentry满足,则需要删除掉原有的endpoint                if oldWorkloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) {                    selected = true                    instance := convertWorkloadEntryToServiceInstances(oldWle, se.services, se.entry, &key)                    instancesDeleted = append(instancesDeleted, instance...)                }            }        } else {            // 满足labelselector 更新为新的            selected = true            instance := convertWorkloadEntryToServiceInstances(wle, se.services, se.entry, &key)            instancesUpdated = append(instancesUpdated, instance...)        }        if selected {            // serviceentry 解析方式为DNS,全量更新            if se.entry.Resolution == networking.ServiceEntry_DNS {                fullPush = true                for key, value := range getUpdatedConfigs(se.services) {                    configsUpdated[key] = value                }            }        }    }    if len(instancesDeleted) > 0 {        s.deleteExistingInstances(key, instancesDeleted)    }    if event != model.EventDelete {        s.updateExistingInstances(key, instancesUpdated)    } else {        s.deleteExistingInstances(key, instancesUpdated)    }    // 非全量值更新eds    if !fullPush {        s.edsUpdate(append(instancesUpdated, instancesDeleted...), true)        // trigger full xds push to the related sidecar proxy        if event == model.EventAdd {            s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address)        }        return    }    // 更新eds cache    s.edsUpdate(append(instancesUpdated, instancesDeleted...), false)    pushReq := &model.PushRequest{        Full:           true,        ConfigsUpdated: configsUpdated,        Reason:         []model.TriggerReason{model.EndpointUpdate},    }    // 全量更新  ads    s.XdsUpdater.ConfigUpdate(pushReq)}

serviceEntryHandler

serviceEntryHandler 为 service entries定义handler

代码语言:javascript
复制
func (s *ServiceEntryStore) serviceEntryHandler(old, curr config.Config, event model.Event) {    // 转换为svc列表    cs := convertServices(curr)    configsUpdated := map[model.ConfigKey]struct{}{}    // 如果是添加/删除事件,始终进行完全推送。如果是update事件,则仅当服务已更改时,我们才应进行完全推送-否则,只需推送端点更新即可。    var addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs []*model.Service    switch event {    case model.EventUpdate:        // 转换为svc列表        os := convertServices(old)        // 对比新老serviceentry的workloadselector,变化则进行全量更新        if selectorChanged(old, curr) {            // 更新所有的svc            mark := make(map[host.Name]*model.Service, len(cs))            for _, svc := range cs {                mark[svc.Hostname] = svc                updatedSvcs = append(updatedSvcs, svc)            }            for _, svc := range os {                if _, f := mark[svc.Hostname]; !f {                    updatedSvcs = append(updatedSvcs, svc)                }            }        } else {            // 对比差异            addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs = servicesDiff(os, cs)        }    case model.EventDelete:        deletedSvcs = cs    case model.EventAdd:        addedSvcs = cs    default:        unchangedSvcs = cs    }    for _, svc := range addedSvcs {        s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventAdd)        configsUpdated[makeConfigKey(svc)] = struct{}{}    }    for _, svc := range updatedSvcs {        s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventUpdate)        configsUpdated[makeConfigKey(svc)] = struct{}{}    }    // 清理endpoint shards    for _, svc := range deletedSvcs {        s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete)        configsUpdated[makeConfigKey(svc)] = struct{}{}    }   if len(unchangedSvcs) > 0 {           currentServiceEntry := curr.Spec.(*networking.ServiceEntry)           oldServiceEntry := old.Spec.(*networking.ServiceEntry)           // 解析dns 且ep变化则更新           if currentServiceEntry.Resolution == networking.ServiceEntry_DNS {               if !reflect.DeepEqual(currentServiceEntry.Endpoints, oldServiceEntry.Endpoints) {                   // fqdn endpoints have changed. Need full push                   for _, svc := range unchangedSvcs {                       configsUpdated[makeConfigKey(svc)] = struct{}{}                   }               }           }       }       fullPush := len(configsUpdated) > 0       // 没有服务变化,即为update操作且没有更新svc       if !fullPush {           // STATIC服务条目中的IP端点已更改。我们需要EDS更新如果是全量推送,则将edsUpdate保留。我们应该对所有未更改的Svcs进行edsUpdate,           instances := convertServiceEntryToInstances(curr, unchangedSvcs)           key := configKey{               kind:      serviceEntryConfigType,               name:      curr.Name,               namespace: curr.Namespace,           }           //如果后端实例变更,为变更的实例更新索引           s.updateExistingInstances(key, instances)           s.edsUpdate(instances, true)           return       }    // 在这里重新计算索引太昂贵-需要时进行延迟构建。如果服务已更改,则仅重新计算索引。    s.refreshIndexes.Store(true)    // 进行完全推送时,非DNS添加,更新,未更改的服务会触发eds更新,以便更新endpointshards。    allServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs))    nonDNSServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs))    allServices = append(allServices, addedSvcs...)    allServices = append(allServices, updatedSvcs...)    allServices = append(allServices, unchangedSvcs...)    for _, svc := range allServices {        if svc.Resolution != model.DNSLB {            nonDNSServices = append(nonDNSServices, svc)        }    }    // 非dns服务    keys := map[instancesKey]struct{}{}    for _, svc := range nonDNSServices {        keys[instancesKey{hostname: svc.Hostname, namespace: curr.Namespace}] = struct{}{}    }    // 更新eds endpoint shards    s.edsUpdateByKeys(keys, false)    pushReq := &model.PushRequest{        Full:           true,        ConfigsUpdated: configsUpdated,        Reason:         []model.TriggerReason{model.ServiceUpdate},    }    s.XdsUpdater.ConfigUpdate(pushReq)}

kubecontroller.WorkloadInstanceHandler

当worloadentry更新时触发k8s对应service的更新及缓存

代码语言:javascript
复制
func (c *Controller) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) {    // ignore malformed workload entries. And ignore any workload entry that does not have a label    // as there is no way for us to select them    if si.Namespace == "" || len(si.Endpoint.Labels) == 0 {        return    }    // this is from a workload entry. Store it in separate map so that    // the InstancesByPort can use these as well as the k8s pods.    c.Lock()    switch event {    case model.EventDelete:        delete(c.workloadInstancesByIP, si.Endpoint.Address)    default: // add or update        // Check to see if the workload entry changed. If it did, clear the old entry        k := si.Namespace + "/" + si.Name        existing := c.workloadInstancesIPsByName[k]        if existing != si.Endpoint.Address {            delete(c.workloadInstancesByIP, existing)        }        c.workloadInstancesByIP[si.Endpoint.Address] = si        c.workloadInstancesIPsByName[k] = si.Endpoint.Address    }    c.Unlock()    // find the workload entry's service by label selector    // rather than scanning through our internal map of model.services, get the services via the k8s apis    dummyPod := &v1.Pod{        ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels},    }    // find the services that map to this workload entry, fire off eds updates if the service is of type client-side lb    if k8sServices, err := getPodServices(c.serviceLister, dummyPod); err == nil && len(k8sServices) > 0 {        for _, k8sSvc := range k8sServices {            var service *model.Service            c.RLock()            service = c.servicesMap[kube.ServiceHostname(k8sSvc.Name, k8sSvc.Namespace, c.domainSuffix)]            c.RUnlock()            // Note that this cannot be an external service because k8s external services do not have label selectors.            if service == nil || service.Resolution != model.ClientSideLB {                // may be a headless service                continue            }            // Get the updated list of endpoints that includes k8s pods and the workload entries for this service            // and then notify the EDS server that endpoints for this service have changed.            // We need one endpoint object for each service port            endpoints := make([]*model.IstioEndpoint, 0)            for _, port := range service.Ports {                if port.Protocol == protocol.UDP {                    continue                }                // Similar code as UpdateServiceShards in eds.go                instances := c.InstancesByPort(service, port.Port, labels.Collection{})                for _, inst := range instances {                    endpoints = append(endpoints, inst.Endpoint)                }            }            // fire off eds update            c.xdsUpdater.EDSUpdate(c.clusterID, string(service.Hostname), service.Attributes.Namespace, endpoints)        }    }}

serviceentrystore.WorkloadInstanceHandler

当k8spod变化时触发对应service entry的变化

代码语言:javascript
复制
// WorkloadInstanceHandler defines the handler for service instances generated by other registriesfunc (s *ServiceEntryStore) WorkloadInstanceHandler(wi *model.WorkloadInstance, event model.Event) {    key := configKey{        kind:      workloadInstanceConfigType,        name:      wi.Name,        namespace: wi.Namespace,    }    // Used to indicate if this event was fired for a pod->workloadentry conversion    // and that the event can be ignored due to no relevant change in the workloadentry    redundantEventForPod := false    var addressToDelete string    s.storeMutex.Lock()    // this is from a pod. Store it in separate map so that    // the refreshIndexes function can use these as well as the store ones.    k := wi.Namespace + "/" + wi.Name    switch event {    case model.EventDelete:        if _, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; !exists {            // multiple delete events for the same pod (succeeded/failed/unknown status repeating).            redundantEventForPod = true        } else {            delete(s.workloadInstancesByIP, wi.Endpoint.Address)            delete(s.workloadInstancesIPsByName, k)        }    default: // add or update        // Check to see if the workload entry changed. If it did, clear the old entry        existing := s.workloadInstancesIPsByName[k]        if existing != "" && existing != wi.Endpoint.Address {            delete(s.workloadInstancesByIP, existing)            addressToDelete = existing        }        if old, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; exists {            // If multiple k8s services select the same pod or a service has multiple ports,            // we may be getting multiple events ignore them as we only care about the Endpoint IP itself.            if model.WorkloadInstancesEqual(old, wi) {                // ignore the update as nothing has changed                redundantEventForPod = true            }        }        s.workloadInstancesByIP[wi.Endpoint.Address] = wi        s.workloadInstancesIPsByName[k] = wi.Endpoint.Address    }    // We will only select entries in the same namespace    entries := s.seWithSelectorByNamespace[wi.Namespace]    s.storeMutex.Unlock()    // nothing useful to do.    if len(entries) == 0 || redundantEventForPod {        return    }    log.Debugf("Handle event %s for service instance (from %s) in namespace %s", event,        wi.Endpoint.Address, wi.Namespace)    instances := []*model.ServiceInstance{}    instancesDeleted := []*model.ServiceInstance{}    for _, se := range entries {        workloadLabels := labels.Collection{wi.Endpoint.Labels}        if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) {            // Not a match, skip this one            continue        }        instance := convertWorkloadInstanceToServiceInstance(wi.Endpoint, se.services, se.entry)        instances = append(instances, instance...)        if addressToDelete != "" {            for _, i := range instance {                di := i.DeepCopy()                di.Endpoint.Address = addressToDelete                instancesDeleted = append(instancesDeleted, di)            }        }    }    if len(instancesDeleted) > 0 {        s.deleteExistingInstances(key, instancesDeleted)    }    if event != model.EventDelete {        s.updateExistingInstances(key, instances)    } else {        s.deleteExistingInstances(key, instances)    }    s.edsUpdate(instances, true)}

serviceentry 感知pod变化

默认开启,当pod发生变化时进行调用

代码语言:javascript
复制
    if m.serviceEntryStore != nil && features.EnableServiceEntrySelectPods {        // 在kubernetes registry中添加实例处理程序,以通知serviceentry存储有关Pod事件的信息        kubeRegistry.AppendWorkloadHandler(m.serviceEntryStore.WorkloadInstanceHandler)    }

具体处理逻辑

代码语言:javascript
复制
// WorkloadInstanceHandler defines the handler for service instances generated by other registriesfunc (s *ServiceEntryStore) WorkloadInstanceHandler(wi *model.WorkloadInstance, event model.Event) {    key := configKey{        kind:      workloadInstanceConfigType,        name:      wi.Name,        namespace: wi.Namespace,    }    // 用于标明是否为pod-> workloadentry转换触发了此事件,并且由于工作负载项没有相关更改而可以忽略该事件    redundantEventForPod := false    var addressToDelete string    s.storeMutex.Lock()    // 从pod里获取 将其存储在单独的映射中,以便refreshIndexes函数可以使用存储的对象    k := wi.Namespace + "/" + wi.Name    switch event {    case model.EventDelete:        if _, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; !exists {            // 同一pod的多个删除事件(成功/失败/未知状态重复)            redundantEventForPod = true        } else {            delete(s.workloadInstancesByIP, wi.Endpoint.Address)            delete(s.workloadInstancesIPsByName, k)        }    default: // add or update        // 检查是否更新        existing := s.workloadInstancesIPsByName[k]        if existing != "" && existing != wi.Endpoint.Address {            delete(s.workloadInstancesByIP, existing)            addressToDelete = existing        }        if old, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; exists {            // 如果多个k8s服务选择同一个Pod或一个服务具有多个端口,则由于我们只关心端点IP本身,我们可能会收到多个事件忽略它们。            if model.WorkloadInstancesEqual(old, wi) {                redundantEventForPod = true            }        }        s.workloadInstancesByIP[wi.Endpoint.Address] = wi        s.workloadInstancesIPsByName[k] = wi.Endpoint.Address    }    // 只过滤同命名空间的对象    entries := s.seWithSelectorByNamespace[wi.Namespace]    s.storeMutex.Unlock()    if len(entries) == 0 || redundantEventForPod {        return    }    instances := []*model.ServiceInstance{}    instancesDeleted := []*model.ServiceInstance{}    for _, se := range entries {        workloadLabels := labels.Collection{wi.Endpoint.Labels}        // 不满足筛选条件则跳过        if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) {            continue        }        // 添加新的instance        instance := convertWorkloadInstanceToServiceInstance(wi.Endpoint, se.services, se.entry)        instances = append(instances, instance...)        // 删除原有的        if addressToDelete != "" {            for _, i := range instance {                di := i.DeepCopy()                di.Endpoint.Address = addressToDelete                instancesDeleted = append(instancesDeleted, di)            }        }    }    // 更新缓存    if len(instancesDeleted) > 0 {        s.deleteExistingInstances(key, instancesDeleted)    }    if event != model.EventDelete {        s.updateExistingInstances(key, instances)    } else {        s.deleteExistingInstances(key, instances)    }    // 触发eds    s.edsUpdate(instances, true)}

service感知workloadentry变化

默认开启

代码语言:javascript
复制
    if localCluster {        // TODO implement deduping in aggregate registry to allow multiple k8s registries to handle WorkloadEntry        if m.serviceEntryStore != nil && features.EnableK8SServiceSelectWorkloadEntries {            // Add an instance handler in the service entry store to notify kubernetes about workload entry events            m.serviceEntryStore.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)        }    }
代码语言:javascript
复制
// WorkloadInstanceHandler定义其他registry生成服务实例的处理程序func (c *Controller) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) {    // 忽略命名空间或label为空的情况    if si.Namespace == "" || len(si.Endpoint.Labels) == 0 {        return    }    // 更新缓存    c.Lock()    switch event {    case model.EventDelete:        delete(c.workloadInstancesByIP, si.Endpoint.Address)    default:        k := si.Namespace + "/" + si.Name        existing := c.workloadInstancesIPsByName[k]        if existing != si.Endpoint.Address {            delete(c.workloadInstancesByIP, existing)        }        c.workloadInstancesByIP[si.Endpoint.Address] = si        c.workloadInstancesIPsByName[k] = si.Endpoint.Address    }    c.Unlock()    // ,通过k8s api获取服务    dummyPod := &v1.Pod{        ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels},    }    // 找到映射到该工作负载条目的服务,如果该服务属于客户端lb类型,则触发eds更新    if k8sServices, err := getPodServices(c.serviceLister, dummyPod); err == nil && len(k8sServices) > 0 {        for _, k8sSvc := range k8sServices {            var service *model.Service            c.RLock()            service = c.servicesMap[kube.ServiceHostname(k8sSvc.Name, k8sSvc.Namespace, c.domainSuffix)]            c.RUnlock()            // 不能是外部服务,因为k8s外部服务没有标签选择器。            if service == nil || service.Resolution != model.ClientSideLB {                // 可能是headless svc                continue            }            // 获取包括该服务的k8s pod和工作负载条目的endpoints的更新列表,然后通知EDS服务器该服务的端点已更改。每个服务端口需要一个endpoint对象            endpoints := make([]*model.IstioEndpoint, 0)            for _, port := range service.Ports {                if port.Protocol == protocol.UDP {                    continue                }                instances := c.InstancesByPort(service, port.Port, labels.Collection{})                for _, inst := range instances {n                    endpoints = append(endpoints, inst.Endpoint)                }            }            // 触发EDS更新            c.xdsUpdater.EDSUpdate(c.clusterID, string(service.Hostname), service.Attributes.Namespace, endpoints)        }    }}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 有点技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据对象
    • ConfigStore
      • ConfigStoreCache
        • handler
        • ConfigStoreCache 类型
          • memory
            • ingress
              • Kubernetes
                • configaggregate
                • eventhandler
                  • configHandler
                    • workloadentryhandler
                      • serviceEntryHandler
                        • kubecontroller.WorkloadInstanceHandler
                          • serviceentrystore.WorkloadInstanceHandler
                          • serviceentry 感知pod变化
                          • service感知workloadentry变化
                          相关产品与服务
                          容器服务
                          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                          领券
                          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档