首页
学习
活动
专区
圈层
工具
发布

KubeEdge - edgecore edgehub模块源码分析

edgehub 源码分析

edgehub是Edge上的通信接口模块,用于云边消息同步

结构定义及初始化

edgehub的结构定义

代码语言:javascript
代码运行次数:0
复制
type EdgeHub struct {    context       *beehiveContext.Context    chClient      clients.Adapter    config        *config.ControllerConfig    reconnectChan chan struct{}    cancel        context.CancelFunc    syncKeeper    map[string]chan model.Message    keeperLock    sync.RWMutex}

在注册edgehub模块的时间 对edgehub进行了初始化

代码语言:javascript
代码运行次数:0
复制
func Register() {    core.Register(&EdgeHub{        config:        &config.GetConfig().CtrConfig,        reconnectChan: make(chan struct{}),        syncKeeper:    make(map[string]chan model.Message),    })}

在模块启动时先将拿到beehiveContext,然后获取EdgehubConfig

在启动时间加载ControllerConfig 根据使用的ControllerConfig中的protocol加载对应的websocket config或者quicconfig

代码语言:javascript
代码运行次数:0
复制
func (eh *EdgeHub) initial() (err error) {    config.GetConfig().WSConfig.URL, err = bhconfig.CONFIG.GetValue("edgehub.websocket.url").ToString()    if err != nil {        klog.Warningf("failed to get cloud hub url, error:%+v", err)        return err    }
    cloudHubClient, err := clients.GetClient(eh.config.Protocol, config.GetConfig())    if err != nil {        return err    }
    eh.chClient = cloudHubClient
    return nil}

然后初始化EdgeHub.Controller.chClient 配置,然后初始化对应连接用于和cloudcore通信

给其他组件同步连接成功状态

然后通过pubConnectInfo给其他的组(resource,twin,func,user)发消息,告诉他们 云端连接成功了

代码语言:javascript
代码运行次数:0
复制
func (eh *EdgeHub) pubConnectInfo(isConnected bool) {    // var info model.Message    content := connect.CloudConnected    if !isConnected {        content = connect.CloudDisconnected    }
    for _, group := range groupMap {        message := model.NewMessage("").BuildRouter(message.SourceNodeConnection, group,            message.ResourceTypeNodeConnection, message.OperationNodeConnection).FillBody(content)        eh.context.SendToGroup(group, *message)    }}

message结构定义如下

type Message struct { Header MessageHeader json:"header"Router MessageRoute json:"route,omitempty"Content interface{} json:"content"}

header包含了

  • 消息的ID
  • 消息的父ID
  • 时间戳
  • 是否被同步

router 定义了以下对象

  • 来源
  • 广播到哪个组
  • 动作
  • 操作的资源

发消息

我们可以看到model.NewMessage("").BuildRouter接收四个参数,分别为:

  • 来源
  • 发给哪个组
  • 资源类型
  • 动作

这里的NewMessage parentID参数为空 证明这是消息的发起者

代码语言:javascript
代码运行次数:0
复制
func NewMessage(parentID string) *Message {    msg := &Message{}    msg.Header.ID = uuid.NewV4().String()    msg.Header.ParentID = parentID    msg.Header.Timestamp = time.Now().UnixNano() / 1e6    return msg}

接下来启动了三个协程

  • routeToEdge
  • routeToCloud
  • keepalive

routeToEdge

routeToEdge接收信息 然后发送信息到对应的group, 判断group是否存在,判断是否是已有同步响应,如果没有发送给对应组。这里就使用到了beehive的messageContext. 接下来根据parentid将此条消息发送到syncKeeper channel里

代码语言:javascript
代码运行次数:0
复制
func (ehc *Controller) routeToEdge() {    for {        message, err := ehc.chClient.Receive()        if err != nil {            klog.Errorf("websocket read error: %v", err)            ehc.stopChan <- struct{}{}            return        }
        klog.Infof("received msg from cloud-hub:%+v", message)        err = ehc.dispatch(message)        if err != nil {            klog.Errorf("failed to dispatch message, discard: %v", err)        }    }}

routeToCloud

将在channel收到的消息发送到云端,同时将消息保存在syncKeeper,这里创建了一个定时器,过期的话会自动删除

代码语言:javascript
代码运行次数:0
复制
func (ehc *Controller) sendToCloud(message model.Message) error {    ehc.keeperLock.Lock()    err := ehc.chClient.Send(message)    ehc.keeperLock.Unlock()    if err != nil {        klog.Errorf("failed to send message: %v", err)        return fmt.Errorf("failed to send message, error: %v", err)    }
    syncKeep := func(message model.Message) {        tempChannel := ehc.addKeepChannel(message.GetID())        sendTimer := time.NewTimer(ehc.config.HeartbeatPeriod)        select {        case response := <-tempChannel:            sendTimer.Stop()            ehc.context.SendResp(response)            ehc.deleteKeepChannel(response.GetParentID())        case <-sendTimer.C:            klog.Warningf("timeout to receive response for message: %+v", message)            ehc.deleteKeepChannel(message.GetID())        }    }
    if message.IsSync() {        go syncKeep(message)    }
    return nil}

keepalive

根据心跳时间向云端发送心跳

代码语言:javascript
代码运行次数:0
复制
func (ehc *Controller) keepalive() {    for {        msg := model.NewMessage("").            BuildRouter(ModuleNameEdgeHub, "resource", "node", "keepalive").            FillBody("ping")
        // post message to cloud hub        err := ehc.sendToCloud(*msg)        if err != nil {            klog.Errorf("websocket write error: %v", err)            ehc.stopChan <- struct{}{}            return        }
        time.Sleep(ehc.config.HeartbeatPeriod)    }}
下一篇
举报
领券