edgehub是Edge上的通信接口模块,用于云边消息同步
edgehub的结构定义
type EdgeHub struct { context *beehiveContext.Context chClient clients.Adapter config *config.ControllerConfig reconnectChan chan struct{} cancel context.CancelFunc syncKeeper map[string]chan model.Message keeperLock sync.RWMutex}
在注册edgehub模块的时间 对edgehub进行了初始化
func Register() { core.Register(&EdgeHub{ config: &config.GetConfig().CtrConfig, reconnectChan: make(chan struct{}), syncKeeper: make(map[string]chan model.Message), })}
在模块启动时先将拿到beehiveContext,然后获取EdgehubConfig
在启动时间加载ControllerConfig 根据使用的ControllerConfig中的protocol加载对应的websocket config或者quicconfig
func (eh *EdgeHub) initial() (err error) { config.GetConfig().WSConfig.URL, err = bhconfig.CONFIG.GetValue("edgehub.websocket.url").ToString() if err != nil { klog.Warningf("failed to get cloud hub url, error:%+v", err) return err }
cloudHubClient, err := clients.GetClient(eh.config.Protocol, config.GetConfig()) if err != nil { return err }
eh.chClient = cloudHubClient
return nil}
然后初始化EdgeHub.Controller.chClient 配置,然后初始化对应连接用于和cloudcore通信
然后通过pubConnectInfo给其他的组(resource,twin,func,user)发消息,告诉他们 云端连接成功了
func (eh *EdgeHub) pubConnectInfo(isConnected bool) { // var info model.Message content := connect.CloudConnected if !isConnected { content = connect.CloudDisconnected }
for _, group := range groupMap { message := model.NewMessage("").BuildRouter(message.SourceNodeConnection, group, message.ResourceTypeNodeConnection, message.OperationNodeConnection).FillBody(content) eh.context.SendToGroup(group, *message) }}
message结构定义如下
type Message struct { Header MessageHeader json:"header"
Router MessageRoute json:"route,omitempty"
Content interface{} json:"content"
}
header包含了
router 定义了以下对象
我们可以看到model.NewMessage("").BuildRouter接收四个参数,分别为:
这里的NewMessage parentID参数为空 证明这是消息的发起者
func NewMessage(parentID string) *Message { msg := &Message{} msg.Header.ID = uuid.NewV4().String() msg.Header.ParentID = parentID msg.Header.Timestamp = time.Now().UnixNano() / 1e6 return msg}
routeToEdge接收信息 然后发送信息到对应的group, 判断group是否存在,判断是否是已有同步响应,如果没有发送给对应组。这里就使用到了beehive的messageContext. 接下来根据parentid将此条消息发送到syncKeeper channel里
func (ehc *Controller) routeToEdge() { for { message, err := ehc.chClient.Receive() if err != nil { klog.Errorf("websocket read error: %v", err) ehc.stopChan <- struct{}{} return }
klog.Infof("received msg from cloud-hub:%+v", message) err = ehc.dispatch(message) if err != nil { klog.Errorf("failed to dispatch message, discard: %v", err) } }}
将在channel收到的消息发送到云端,同时将消息保存在syncKeeper,这里创建了一个定时器,过期的话会自动删除
func (ehc *Controller) sendToCloud(message model.Message) error { ehc.keeperLock.Lock() err := ehc.chClient.Send(message) ehc.keeperLock.Unlock() if err != nil { klog.Errorf("failed to send message: %v", err) return fmt.Errorf("failed to send message, error: %v", err) }
syncKeep := func(message model.Message) { tempChannel := ehc.addKeepChannel(message.GetID()) sendTimer := time.NewTimer(ehc.config.HeartbeatPeriod) select { case response := <-tempChannel: sendTimer.Stop() ehc.context.SendResp(response) ehc.deleteKeepChannel(response.GetParentID()) case <-sendTimer.C: klog.Warningf("timeout to receive response for message: %+v", message) ehc.deleteKeepChannel(message.GetID()) } }
if message.IsSync() { go syncKeep(message) }
return nil}
根据心跳时间向云端发送心跳
func (ehc *Controller) keepalive() { for { msg := model.NewMessage(""). BuildRouter(ModuleNameEdgeHub, "resource", "node", "keepalive"). FillBody("ping")
// post message to cloud hub err := ehc.sendToCloud(*msg) if err != nil { klog.Errorf("websocket write error: %v", err) ehc.stopChan <- struct{}{} return }
time.Sleep(ehc.config.HeartbeatPeriod) }}