定期发送MetaSync操作消息,以同步在边缘节点上运行的Pod的状态。同步间隔可在conf/edge.yaml中配置
func (m *metaManager) Start() { var ctx context.Context ctx, m.cancel = context.WithCancel(context.Background()) InitMetaManagerConfig()
go func() { period := getSyncInterval() timer := time.NewTimer(period) for { select { case <-ctx.Done(): klog.Warning("MetaManager stop") return case <-timer.C: timer.Reset(period) msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync) beehiveContext.Send(MetaManagerModuleName, *msg) } } }()
m.runMetaManager(ctx)}
runMetaManager从beehive中读取metamanager的消息,交给process处理
func (m *metaManager) runMetaManager(ctx context.Context) { go func() { for { select { case <-ctx.Done(): klog.Warning("MetaManager mainloop stop") return default:
} if msg, err := beehiveContext.Receive(m.Name()); err == nil { klog.Infof("get a message %+v", msg) m.process(msg) } else { klog.Errorf("get a message %+v: %v", msg, err) } } }()}
判断动作,然后执行相对应的操作
func (m *metaManager) process(message model.Message) { operation := message.GetOperation() switch operation { case model.InsertOperation: m.processInsert(message) case model.UpdateOperation: m.processUpdate(message) case model.DeleteOperation: m.processDelete(message) case model.QueryOperation: m.processQuery(message) case model.ResponseOperation: m.processResponse(message) case messagepkg.OperationNodeConnection: m.processNodeConnection(message) case OperationMetaSync: m.processSync(message) case OperationFunctionAction: m.processFunctionAction(message) case OperationFunctionActionResult: m.processFunctionActionResult(message) case constants.CSIOperationTypeCreateVolume, constants.CSIOperationTypeDeleteVolume, constants.CSIOperationTypeControllerPublishVolume, constants.CSIOperationTypeControllerUnpublishVolume: m.processVolume(message) }}
// Resource format: <namespace>/<restype>[/resid]// return <reskey, restype, resid>func parseResource(resource string) (string, string, string) { tokens := strings.Split(resource, constants.ResourceSep) resType := "" resID := "" switch len(tokens) { case 2: resType = tokens[len(tokens)-1] case 3: resType = tokens[len(tokens)-2] resID = tokens[len(tokens)-1] default: } return resource, resType, resID}
func resourceUnchanged(resType string, resKey string, content []byte) bool { if resType == model.ResourceTypePodStatus { dbRecord, err := dao.QueryMeta("key", resKey) if err == nil && len(*dbRecord) > 0 && string(content) == (*dbRecord)[0] { return true } }
return false}
用于同步pod状态
本地保存后将消息发送给,edgefunction
本地保存函数执行结果,返回给云端
发消息给edged,返回结果传给云端