前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >KubeEdge - edgecore metamanager模块源码分析

KubeEdge - edgecore metamanager模块源码分析

作者头像
有点技术
发布2020-07-14 14:58:41
7470
发布2020-07-14 14:58:41
举报
文章被收录于专栏:有点技术有点技术有点技术

metamanager

Start

定期发送MetaSync操作消息,以同步在边缘节点上运行的Pod的状态。同步间隔可在conf/edge.yaml中配置

func (m *metaManager) Start() {    var ctx context.Context    ctx, m.cancel = context.WithCancel(context.Background())    InitMetaManagerConfig()
    go func() {        period := getSyncInterval()        timer := time.NewTimer(period)        for {            select {            case <-ctx.Done():                klog.Warning("MetaManager stop")                return            case <-timer.C:                timer.Reset(period)                msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)                beehiveContext.Send(MetaManagerModuleName, *msg)            }        }    }()
    m.runMetaManager(ctx)}

runMetaManager

runMetaManager从beehive中读取metamanager的消息,交给process处理

func (m *metaManager) runMetaManager(ctx context.Context) {    go func() {        for {            select {            case <-ctx.Done():                klog.Warning("MetaManager mainloop stop")                return            default:
            }            if msg, err := beehiveContext.Receive(m.Name()); err == nil {                klog.Infof("get a message %+v", msg)                m.process(msg)            } else {                klog.Errorf("get a message %+v: %v", msg, err)            }        }    }()}

process

判断动作,然后执行相对应的操作

func (m *metaManager) process(message model.Message) {    operation := message.GetOperation()    switch operation {    case model.InsertOperation:        m.processInsert(message)    case model.UpdateOperation:        m.processUpdate(message)    case model.DeleteOperation:        m.processDelete(message)    case model.QueryOperation:        m.processQuery(message)    case model.ResponseOperation:        m.processResponse(message)    case messagepkg.OperationNodeConnection:        m.processNodeConnection(message)    case OperationMetaSync:        m.processSync(message)    case OperationFunctionAction:        m.processFunctionAction(message)    case OperationFunctionActionResult:        m.processFunctionActionResult(message)    case constants.CSIOperationTypeCreateVolume,        constants.CSIOperationTypeDeleteVolume,        constants.CSIOperationTypeControllerPublishVolume,        constants.CSIOperationTypeControllerUnpublishVolume:        m.processVolume(message)    }}

insert

  • insert获取了消息的内容
  • 使用parseResource,根据resource返回资源类型,资源ID,
// Resource format: <namespace>/<restype>[/resid]// return <reskey, restype, resid>func parseResource(resource string) (string, string, string) {    tokens := strings.Split(resource, constants.ResourceSep)    resType := ""    resID := ""    switch len(tokens) {    case 2:        resType = tokens[len(tokens)-1]    case 3:        resType = tokens[len(tokens)-2]        resID = tokens[len(tokens)-1]    default:    }    return resource, resType, resID}
  • 将数据存入数据库,
  • 判断资源类型是service和endpoint的消息发送给edgemesh,其他的发送给Edged
  • 最后将所有的insert消息发送给edgehub

update

  • 当资源类型为 servicelist endpointslist podlist时需要更新数据库,发送消息给edgemesh,发送消息到edgehub
  • 当资源类型为其他类型时首先判断资源是否变化(实际上只判断了pod的状态),代码如下
func resourceUnchanged(resType string, resKey string, content []byte) bool {    if resType == model.ResourceTypePodStatus {        dbRecord, err := dao.QueryMeta("key", resKey)        if err == nil && len(*dbRecord) > 0 && string(content) == (*dbRecord)[0] {            return true        }    }
    return false}
  • 更新数据库
  • 当来源是edged时消息发送给cloud和edged
  • 当消息来源是edgecontroller时,判断资源类型是否为service或endpoint类型。如果是发送消息给edgemesh,不是则发送消息给edged
  • 当来源是funcmgr时,发送消息给edgefunction
  • 当来源是edgefunction时消息发送给edgehub

delete

  • 删除数据库内容
  • 转发消息给edged
  • 返回响应到edgehub

query

  • 判断资源类型是否依赖远端查询,且远端为已连接状态且连接到云端,
  • 当资源在本地查询失败,或者资源类型为node或类型为volume attachment
    • 则向云端发送查询请求,将返回信息存储,并根据类型(service,endpoints)发送消息给edgemesh还是edged
  • 其他资源直接由metamanager返回,并通知edged同步状态
  • 当资源不需要远端查询,则在本地查询进行返回

response

  • 当来源是云端,判断类型是否为service和endpoint,则发送消息给edgemesh,否则发给edged
  • 不是云端则发给云端

node/connection

  • 设置云端连接状态

meta-internal-sync

用于同步pod状态

  • 当pod在db中无记录则跳过
  • 当有pod状态记录无pod记录时,删除pod状态记录
  • 将所有的pod状态记录发送到云端

action

本地保存后将消息发送给,edgefunction

action_result

本地保存函数执行结果,返回给云端

createvolume、deletevolume、controllerpublishvolume、controllerunpublishvolume

发消息给edged,返回结果传给云端

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 有点技术 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • metamanager
  • Start
  • runMetaManager
  • process
  • insert
  • update
  • delete
  • query
  • response
  • node/connection
  • meta-internal-sync
  • action
  • action_result
  • createvolume、deletevolume、controllerpublishvolume、controllerunpublishvolume
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档