ConfigStore描述了基础平台必须支持的一组平台无关的API,以存储和检索Istio配置。配置键定义为配置对象的类型,名称和命名空间的组合。保证配置密钥在存储中是唯一的。此处显示的存储接口假定基础存储层支持_Get_(列表),_Update_(更新),_Create_(创建)和_Delete_语义,但不保证任何事务语义。_Update_,_ Create_,和_Delete_是变量操作。这些操作是异步的,您可能不会立即看到效果(例如,在对存储进行更改后,_Get_可能不会立即通过键返回对象。)即使操作成功,也可能会出现间歇性错误,因此您应始终检查对象存储是否已被修改即使变异操作返回错误。应该使用_Create_操作创建对象并使用_Update_操作更新对象。资源版本记录每个对象上的最后一个变异操作。如果将变异应用于对象的修订版本与纯等式定义的基础存储所期望的版本不同,则操作将被阻止。此接口的客户端不应假设版本标识符的结构或顺序。从此接口提供和返回的对象引用应视为只读。修改它们会违反线程安全性。
ConfigStoreCache是配置存储的本地完全复制的缓存。缓存主动将其本地状态与远程存储同步,并提供通知机制以接收更新事件。这样,通知处理程序必须在调用_Run_之前注册,并且缓存在调用_Run_之后需要初始同步宽限期。
更新通知要求以下一致性保证:通知到达时,缓存中的视图必须至少是最新的,但是可能更新鲜(例如_Delete_取消_Add_事件)。
处理程序按照附加的顺序在单个工作程序队列上执行。处理程序接收通知事件和关联的对象。请注意,在启动缓存控制器之前,必须注册所有处理程序。
ConfigStoreCache 相较于ConfigStore多了三个方法
type ConfigStoreCache interface { ConfigStore // RegisterEventHandler 为指定的类型的更新时间添加处理器 RegisterEventHandler(kind config.GroupVersionKind, handler func(config.Config, config.Config, Event)) // 运行 Run(stop <-chan struct{}) // 初始高速缓存同步完成后,HasSynced返回true HasSynced() bool}
当Config发生变化,处理event事件
handler func(config.Config, config.Config, Event)
istio中有两处用到了memory ConfigStore
•监听registry配置文件变化,写入到mem store
初始化一个mapstore := memory.Make(collections.Pilot)构造为ConfigStoreCacheconfigController := memory.NewController(store)监听文件变化来处理事件err := s.makeFileMonitor(args.RegistryOptions.FileDir, args.RegistryOptions.KubeOptions.DomainSuffix, configController)if err !=
nil
{
return err}s.ConfigStores
= append(s.ConfigStores, configController)
•监听xds数据变化,写入到mem store
xdsMCP, err := adsc.New(srcAddress.Host, &adsc.Config{ Meta: model.NodeMetadata{ Generator: "api", }.ToStruct(), InitialDiscoveryRequests: adsc.ConfigInitialRequests(),})if err != nil { return fmt.Errorf("failed to dial XDS %s %v", configSource.Address, err)}store := memory.Make(collections.Pilot)configController := memory.NewController(store)xdsMCP.Store = model.MakeIstioStore(configController)err = xdsMCP.Run()if err != nil { return fmt.Errorf("MCP: failed running %v", err)}s.ConfigStores = append(s.ConfigStores, configController)log.Warn("Started XDS config ", s.ConfigStores)
监听k8s ingress资源变化
ingress.NewController(s.kubeClient, s.environment.Watcher, args.RegistryOptions.KubeOptions)
监听k8s资源变化
// 创建一个crd监听资源变化configController, err := s.makeKubeConfigController(args)if err != nil { return err}s.ConfigStores = append(s.ConfigStores, configController)// 如果启用了service-api 则需要获取service-api资源if features.EnableServiceApis { s.ConfigStores = append(s.ConfigStores, gateway.NewController(s.kubeClient, configController, args.RegistryOptions.KubeOptions))}
将所有的 []model.ConfigStoreCache转化为一个,从而统一管理,注册对应的EventHandler
configaggregate.MakeCache(s.ConfigStores)
用来监听以下crd obj的事件变化
Pilot = collection.NewSchemasBuilder(). MustAdd(IstioNetworkingV1Alpha3Destinationrules). MustAdd(IstioNetworkingV1Alpha3Envoyfilters). MustAdd(IstioNetworkingV1Alpha3Gateways). MustAdd(IstioNetworkingV1Alpha3Serviceentries). MustAdd(IstioNetworkingV1Alpha3Sidecars). MustAdd(IstioNetworkingV1Alpha3Virtualservices). MustAdd(IstioNetworkingV1Alpha3Workloadentries). MustAdd(IstioNetworkingV1Alpha3Workloadgroups). MustAdd(IstioSecurityV1Beta1Authorizationpolicies). MustAdd(IstioSecurityV1Beta1Peerauthentications). MustAdd(IstioSecurityV1Beta1Requestauthentications). Build()
confighandler的具体实现
configHandler := func(_, curr config.Config, event model.Event) { pushReq := &model.PushRequest{ Full: true, ConfigsUpdated: map[model.ConfigKey]struct{}{{ Kind: curr.GroupVersionKind, Name: curr.Name, Namespace: curr.Namespace, }: {}}, Reason: []model.TriggerReason{model.ConfigUpdate}, } s.XDSServer.ConfigUpdate(pushReq) if event != model.EventDelete { s.statusReporter.AddInProgressResource(curr) } else { s.statusReporter.DeleteInProgressResource(curr) }}
WorkloadEntryHandler定义WorkloadEntry的handler
func (s *ServiceEntryStore) workloadEntryHandler(old, curr config.Config, event model.Event) { var oldWle *networking.WorkloadEntry if old.Spec != nil { oldWle = old.Spec.(*networking.WorkloadEntry) } wle := curr.Spec.(*networking.WorkloadEntry) key := configKey{ kind: workloadEntryConfigType, name: curr.Name, namespace: curr.Namespace, } if features.WorkloadEntryHealthChecks && !isHealthy(curr) { event = model.EventDelete } // fire off the k8s handlers if len(s.workloadHandlers) > 0 { si := convertWorkloadEntryToWorkloadInstance(curr) if si != nil { for _, h := range s.workloadHandlers { h(si, event) } } } s.storeMutex.RLock() // 获取同命名空间的entries entries := s.seWithSelectorByNamespace[curr.Namespace] s.storeMutex.RUnlock() // if there are no service entries, return now to avoid taking unnecessary locks if len(entries) == 0 { return } log.Debugf("Handle event %s for workload entry %s in namespace %s", event, curr.Name, curr.Namespace) instancesUpdated := []*model.ServiceInstance{} instancesDeleted := []*model.ServiceInstance{} workloadLabels := labels.Collection{wle.Labels} fullPush := false configsUpdated := map[model.ConfigKey]struct{}{} for _, se := range entries { selected := false // workloadentry不满足serviceentry label selector if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) { // 更新操作 if oldWle != nil { oldWorkloadLabels := labels.Collection{oldWle.Labels} // 如果老的workloadentry满足,则需要删除掉原有的endpoint if oldWorkloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) { selected = true instance := convertWorkloadEntryToServiceInstances(oldWle, se.services, se.entry, &key) instancesDeleted = append(instancesDeleted, instance...) } } } else { // 满足labelselector 更新为新的 selected = true instance := convertWorkloadEntryToServiceInstances(wle, se.services, se.entry, &key) instancesUpdated = append(instancesUpdated, instance...) } if selected { // serviceentry 解析方式为DNS,全量更新 if se.entry.Resolution == networking.ServiceEntry_DNS { fullPush = true for key, value := range getUpdatedConfigs(se.services) { configsUpdated[key] = value } } } } if len(instancesDeleted) > 0 { s.deleteExistingInstances(key, instancesDeleted) } if event != model.EventDelete { s.updateExistingInstances(key, instancesUpdated) } else { s.deleteExistingInstances(key, instancesUpdated) } // 非全量值更新eds if !fullPush { s.edsUpdate(append(instancesUpdated, instancesDeleted...), true) // trigger full xds push to the related sidecar proxy if event == model.EventAdd { s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address) } return } // 更新eds cache s.edsUpdate(append(instancesUpdated, instancesDeleted...), false) pushReq := &model.PushRequest{ Full: true, ConfigsUpdated: configsUpdated, Reason: []model.TriggerReason{model.EndpointUpdate}, } // 全量更新 ads s.XdsUpdater.ConfigUpdate(pushReq)}
serviceEntryHandler 为 service entries定义handler
func (s *ServiceEntryStore) serviceEntryHandler(old, curr config.Config, event model.Event) { // 转换为svc列表 cs := convertServices(curr) configsUpdated := map[model.ConfigKey]struct{}{} // 如果是添加/删除事件,始终进行完全推送。如果是update事件,则仅当服务已更改时,我们才应进行完全推送-否则,只需推送端点更新即可。 var addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs []*model.Service switch event { case model.EventUpdate: // 转换为svc列表 os := convertServices(old) // 对比新老serviceentry的workloadselector,变化则进行全量更新 if selectorChanged(old, curr) { // 更新所有的svc mark := make(map[host.Name]*model.Service, len(cs)) for _, svc := range cs { mark[svc.Hostname] = svc updatedSvcs = append(updatedSvcs, svc) } for _, svc := range os { if _, f := mark[svc.Hostname]; !f { updatedSvcs = append(updatedSvcs, svc) } } } else { // 对比差异 addedSvcs, deletedSvcs, updatedSvcs, unchangedSvcs = servicesDiff(os, cs) } case model.EventDelete: deletedSvcs = cs case model.EventAdd: addedSvcs = cs default: unchangedSvcs = cs } for _, svc := range addedSvcs { s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventAdd) configsUpdated[makeConfigKey(svc)] = struct{}{} } for _, svc := range updatedSvcs { s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventUpdate) configsUpdated[makeConfigKey(svc)] = struct{}{} } // 清理endpoint shards for _, svc := range deletedSvcs { s.XdsUpdater.SvcUpdate(s.Cluster(), string(svc.Hostname), svc.Attributes.Namespace, model.EventDelete) configsUpdated[makeConfigKey(svc)] = struct{}{} } if len(unchangedSvcs) > 0 { currentServiceEntry := curr.Spec.(*networking.ServiceEntry) oldServiceEntry := old.Spec.(*networking.ServiceEntry) // 解析dns 且ep变化则更新 if currentServiceEntry.Resolution == networking.ServiceEntry_DNS { if !reflect.DeepEqual(currentServiceEntry.Endpoints, oldServiceEntry.Endpoints) { // fqdn endpoints have changed. Need full push for _, svc := range unchangedSvcs { configsUpdated[makeConfigKey(svc)] = struct{}{} } } } } fullPush := len(configsUpdated) > 0 // 没有服务变化,即为update操作且没有更新svc if !fullPush { // STATIC服务条目中的IP端点已更改。我们需要EDS更新如果是全量推送,则将edsUpdate保留。我们应该对所有未更改的Svcs进行edsUpdate, instances := convertServiceEntryToInstances(curr, unchangedSvcs) key := configKey{ kind: serviceEntryConfigType, name: curr.Name, namespace: curr.Namespace, } //如果后端实例变更,为变更的实例更新索引 s.updateExistingInstances(key, instances) s.edsUpdate(instances, true) return } // 在这里重新计算索引太昂贵-需要时进行延迟构建。如果服务已更改,则仅重新计算索引。 s.refreshIndexes.Store(true) // 进行完全推送时,非DNS添加,更新,未更改的服务会触发eds更新,以便更新endpointshards。 allServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs)) nonDNSServices := make([]*model.Service, 0, len(addedSvcs)+len(updatedSvcs)+len(unchangedSvcs)) allServices = append(allServices, addedSvcs...) allServices = append(allServices, updatedSvcs...) allServices = append(allServices, unchangedSvcs...) for _, svc := range allServices { if svc.Resolution != model.DNSLB { nonDNSServices = append(nonDNSServices, svc) } } // 非dns服务 keys := map[instancesKey]struct{}{} for _, svc := range nonDNSServices { keys[instancesKey{hostname: svc.Hostname, namespace: curr.Namespace}] = struct{}{} } // 更新eds endpoint shards s.edsUpdateByKeys(keys, false) pushReq := &model.PushRequest{ Full: true, ConfigsUpdated: configsUpdated, Reason: []model.TriggerReason{model.ServiceUpdate}, } s.XdsUpdater.ConfigUpdate(pushReq)}
当worloadentry更新时触发k8s对应service的更新及缓存
func (c *Controller) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) { // ignore malformed workload entries. And ignore any workload entry that does not have a label // as there is no way for us to select them if si.Namespace == "" || len(si.Endpoint.Labels) == 0 { return } // this is from a workload entry. Store it in separate map so that // the InstancesByPort can use these as well as the k8s pods. c.Lock() switch event { case model.EventDelete: delete(c.workloadInstancesByIP, si.Endpoint.Address) default: // add or update // Check to see if the workload entry changed. If it did, clear the old entry k := si.Namespace + "/" + si.Name existing := c.workloadInstancesIPsByName[k] if existing != si.Endpoint.Address { delete(c.workloadInstancesByIP, existing) } c.workloadInstancesByIP[si.Endpoint.Address] = si c.workloadInstancesIPsByName[k] = si.Endpoint.Address } c.Unlock() // find the workload entry's service by label selector // rather than scanning through our internal map of model.services, get the services via the k8s apis dummyPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels}, } // find the services that map to this workload entry, fire off eds updates if the service is of type client-side lb if k8sServices, err := getPodServices(c.serviceLister, dummyPod); err == nil && len(k8sServices) > 0 { for _, k8sSvc := range k8sServices { var service *model.Service c.RLock() service = c.servicesMap[kube.ServiceHostname(k8sSvc.Name, k8sSvc.Namespace, c.domainSuffix)] c.RUnlock() // Note that this cannot be an external service because k8s external services do not have label selectors. if service == nil || service.Resolution != model.ClientSideLB { // may be a headless service continue } // Get the updated list of endpoints that includes k8s pods and the workload entries for this service // and then notify the EDS server that endpoints for this service have changed. // We need one endpoint object for each service port endpoints := make([]*model.IstioEndpoint, 0) for _, port := range service.Ports { if port.Protocol == protocol.UDP { continue } // Similar code as UpdateServiceShards in eds.go instances := c.InstancesByPort(service, port.Port, labels.Collection{}) for _, inst := range instances { endpoints = append(endpoints, inst.Endpoint) } } // fire off eds update c.xdsUpdater.EDSUpdate(c.clusterID, string(service.Hostname), service.Attributes.Namespace, endpoints) } }}
当k8spod变化时触发对应service entry的变化
// WorkloadInstanceHandler defines the handler for service instances generated by other registriesfunc (s *ServiceEntryStore) WorkloadInstanceHandler(wi *model.WorkloadInstance, event model.Event) { key := configKey{ kind: workloadInstanceConfigType, name: wi.Name, namespace: wi.Namespace, } // Used to indicate if this event was fired for a pod->workloadentry conversion // and that the event can be ignored due to no relevant change in the workloadentry redundantEventForPod := false var addressToDelete string s.storeMutex.Lock() // this is from a pod. Store it in separate map so that // the refreshIndexes function can use these as well as the store ones. k := wi.Namespace + "/" + wi.Name switch event { case model.EventDelete: if _, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; !exists { // multiple delete events for the same pod (succeeded/failed/unknown status repeating). redundantEventForPod = true } else { delete(s.workloadInstancesByIP, wi.Endpoint.Address) delete(s.workloadInstancesIPsByName, k) } default: // add or update // Check to see if the workload entry changed. If it did, clear the old entry existing := s.workloadInstancesIPsByName[k] if existing != "" && existing != wi.Endpoint.Address { delete(s.workloadInstancesByIP, existing) addressToDelete = existing } if old, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; exists { // If multiple k8s services select the same pod or a service has multiple ports, // we may be getting multiple events ignore them as we only care about the Endpoint IP itself. if model.WorkloadInstancesEqual(old, wi) { // ignore the update as nothing has changed redundantEventForPod = true } } s.workloadInstancesByIP[wi.Endpoint.Address] = wi s.workloadInstancesIPsByName[k] = wi.Endpoint.Address } // We will only select entries in the same namespace entries := s.seWithSelectorByNamespace[wi.Namespace] s.storeMutex.Unlock() // nothing useful to do. if len(entries) == 0 || redundantEventForPod { return } log.Debugf("Handle event %s for service instance (from %s) in namespace %s", event, wi.Endpoint.Address, wi.Namespace) instances := []*model.ServiceInstance{} instancesDeleted := []*model.ServiceInstance{} for _, se := range entries { workloadLabels := labels.Collection{wi.Endpoint.Labels} if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) { // Not a match, skip this one continue } instance := convertWorkloadInstanceToServiceInstance(wi.Endpoint, se.services, se.entry) instances = append(instances, instance...) if addressToDelete != "" { for _, i := range instance { di := i.DeepCopy() di.Endpoint.Address = addressToDelete instancesDeleted = append(instancesDeleted, di) } } } if len(instancesDeleted) > 0 { s.deleteExistingInstances(key, instancesDeleted) } if event != model.EventDelete { s.updateExistingInstances(key, instances) } else { s.deleteExistingInstances(key, instances) } s.edsUpdate(instances, true)}
默认开启,当pod发生变化时进行调用
if m.serviceEntryStore != nil && features.EnableServiceEntrySelectPods { // 在kubernetes registry中添加实例处理程序,以通知serviceentry存储有关Pod事件的信息 kubeRegistry.AppendWorkloadHandler(m.serviceEntryStore.WorkloadInstanceHandler) }
具体处理逻辑
// WorkloadInstanceHandler defines the handler for service instances generated by other registriesfunc (s *ServiceEntryStore) WorkloadInstanceHandler(wi *model.WorkloadInstance, event model.Event) { key := configKey{ kind: workloadInstanceConfigType, name: wi.Name, namespace: wi.Namespace, } // 用于标明是否为pod-> workloadentry转换触发了此事件,并且由于工作负载项没有相关更改而可以忽略该事件 redundantEventForPod := false var addressToDelete string s.storeMutex.Lock() // 从pod里获取 将其存储在单独的映射中,以便refreshIndexes函数可以使用存储的对象 k := wi.Namespace + "/" + wi.Name switch event { case model.EventDelete: if _, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; !exists { // 同一pod的多个删除事件(成功/失败/未知状态重复) redundantEventForPod = true } else { delete(s.workloadInstancesByIP, wi.Endpoint.Address) delete(s.workloadInstancesIPsByName, k) } default: // add or update // 检查是否更新 existing := s.workloadInstancesIPsByName[k] if existing != "" && existing != wi.Endpoint.Address { delete(s.workloadInstancesByIP, existing) addressToDelete = existing } if old, exists := s.workloadInstancesByIP[wi.Endpoint.Address]; exists { // 如果多个k8s服务选择同一个Pod或一个服务具有多个端口,则由于我们只关心端点IP本身,我们可能会收到多个事件忽略它们。 if model.WorkloadInstancesEqual(old, wi) { redundantEventForPod = true } } s.workloadInstancesByIP[wi.Endpoint.Address] = wi s.workloadInstancesIPsByName[k] = wi.Endpoint.Address } // 只过滤同命名空间的对象 entries := s.seWithSelectorByNamespace[wi.Namespace] s.storeMutex.Unlock() if len(entries) == 0 || redundantEventForPod { return } instances := []*model.ServiceInstance{} instancesDeleted := []*model.ServiceInstance{} for _, se := range entries { workloadLabels := labels.Collection{wi.Endpoint.Labels} // 不满足筛选条件则跳过 if !workloadLabels.IsSupersetOf(se.entry.WorkloadSelector.Labels) { continue } // 添加新的instance instance := convertWorkloadInstanceToServiceInstance(wi.Endpoint, se.services, se.entry) instances = append(instances, instance...) // 删除原有的 if addressToDelete != "" { for _, i := range instance { di := i.DeepCopy() di.Endpoint.Address = addressToDelete instancesDeleted = append(instancesDeleted, di) } } } // 更新缓存 if len(instancesDeleted) > 0 { s.deleteExistingInstances(key, instancesDeleted) } if event != model.EventDelete { s.updateExistingInstances(key, instances) } else { s.deleteExistingInstances(key, instances) } // 触发eds s.edsUpdate(instances, true)}
默认开启
if localCluster { // TODO implement deduping in aggregate registry to allow multiple k8s registries to handle WorkloadEntry if m.serviceEntryStore != nil && features.EnableK8SServiceSelectWorkloadEntries { // Add an instance handler in the service entry store to notify kubernetes about workload entry events m.serviceEntryStore.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler) } }
// WorkloadInstanceHandler定义其他registry生成服务实例的处理程序func (c *Controller) WorkloadInstanceHandler(si *model.WorkloadInstance, event model.Event) { // 忽略命名空间或label为空的情况 if si.Namespace == "" || len(si.Endpoint.Labels) == 0 { return } // 更新缓存 c.Lock() switch event { case model.EventDelete: delete(c.workloadInstancesByIP, si.Endpoint.Address) default: k := si.Namespace + "/" + si.Name existing := c.workloadInstancesIPsByName[k] if existing != si.Endpoint.Address { delete(c.workloadInstancesByIP, existing) } c.workloadInstancesByIP[si.Endpoint.Address] = si c.workloadInstancesIPsByName[k] = si.Endpoint.Address } c.Unlock() // ,通过k8s api获取服务 dummyPod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Namespace: si.Namespace, Labels: si.Endpoint.Labels}, } // 找到映射到该工作负载条目的服务,如果该服务属于客户端lb类型,则触发eds更新 if k8sServices, err := getPodServices(c.serviceLister, dummyPod); err == nil && len(k8sServices) > 0 { for _, k8sSvc := range k8sServices { var service *model.Service c.RLock() service = c.servicesMap[kube.ServiceHostname(k8sSvc.Name, k8sSvc.Namespace, c.domainSuffix)] c.RUnlock() // 不能是外部服务,因为k8s外部服务没有标签选择器。 if service == nil || service.Resolution != model.ClientSideLB { // 可能是headless svc continue } // 获取包括该服务的k8s pod和工作负载条目的endpoints的更新列表,然后通知EDS服务器该服务的端点已更改。每个服务端口需要一个endpoint对象 endpoints := make([]*model.IstioEndpoint, 0) for _, port := range service.Ports { if port.Protocol == protocol.UDP { continue } instances := c.InstancesByPort(service, port.Port, labels.Collection{}) for _, inst := range instances {n endpoints = append(endpoints, inst.Endpoint) } } // 触发EDS更新 c.xdsUpdater.EDSUpdate(c.clusterID, string(service.Hostname), service.Attributes.Namespace, endpoints) } }}