前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊nacos-sdk-go的HostReactor

聊聊nacos-sdk-go的HostReactor

作者头像
code4it
发布2020-07-02 14:15:45
3350
发布2020-07-02 14:15:45
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下nacos-sdk-go的HostReactor

HostReactor

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

type HostReactor struct {
    serviceInfoMap       cache.ConcurrentMap
    cacheDir             string
    updateThreadNum      int
    serviceProxy         NamingProxy
    pushReceiver         PushReceiver
    subCallback          SubscribeCallback
    updateTimeMap        cache.ConcurrentMap
    updateCacheWhenEmpty bool
}
  • HostReactor定义了serviceInfoMap、cacheDir、updateThreadNum、serviceProxy、pushReceiver、subCallback、updateTimeMap、updateCacheWhenEmpty属性

NewHostReactor

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func NewHostReactor(serviceProxy NamingProxy, cacheDir string, updateThreadNum int, notLoadCacheAtStart bool, subCallback SubscribeCallback, updateCacheWhenEmpty bool) HostReactor {
    if updateThreadNum <= 0 {
        updateThreadNum = Default_Update_Thread_Num
    }
    hr := HostReactor{
        serviceProxy:         serviceProxy,
        cacheDir:             cacheDir,
        updateThreadNum:      updateThreadNum,
        serviceInfoMap:       cache.NewConcurrentMap(),
        subCallback:          subCallback,
        updateTimeMap:        cache.NewConcurrentMap(),
        updateCacheWhenEmpty: updateCacheWhenEmpty,
    }
    pr := NewPushRecevier(&hr)
    hr.pushReceiver = *pr
    if !notLoadCacheAtStart {
        hr.loadCacheFromDisk()
    }
    go hr.asyncUpdateService()
    return hr
}
  • NewHostReactor方法创建HostReactor,然后通过NewPushRecevier创建pushReceiver,对于notLoadCacheAtStart为false的则执行loadCacheFromDisk,之后异步执行asyncUpdateService

loadCacheFromDisk

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) loadCacheFromDisk() {
    serviceMap := cache.ReadServicesFromFile(hr.cacheDir)
    if serviceMap == nil || len(serviceMap) == 0 {
        return
    }
    for k, v := range serviceMap {
        hr.serviceInfoMap.Set(k, v)
    }
}
  • loadCacheFromDisk方法通过cache.ReadServicesFromFile(hr.cacheDir)获取serviceMap

asyncUpdateService

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) asyncUpdateService() {
    sema := utils.NewSemaphore(hr.updateThreadNum)
    for {
        for _, v := range hr.serviceInfoMap.Items() {
            service := v.(model.Service)
            lastRefTime, ok := hr.updateTimeMap.Get(utils.GetServiceCacheKey(service.Name, service.Clusters))
            if !ok {
                lastRefTime = uint64(0)
            }
            if uint64(utils.CurrentMillis())-lastRefTime.(uint64) > service.CacheMillis {
                sema.Acquire()
                go func() {
                    hr.updateServiceNow(service.Name, service.Clusters)
                    sema.Release()
                }()
            }
        }
        time.Sleep(1 * time.Second)
    }

}
  • asyncUpdateService方法遍历serviceInfoMap,通过hr.updateTimeMap.Get(utils.GetServiceCacheKey(service.Name, service.Clusters))获取lastRefTime,然后判断是否超过service.CacheMillis,超过的haul则执行sema.Acquire(),异步hr.updateServiceNow(service.Name, service.Clusters),最后执行sema.Release()

updateServiceNow

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) updateServiceNow(serviceName string, clusters string) {
    result, err := hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)
    if err != nil {
        log.Printf("[ERROR]:query list return error!servieName:%s cluster:%s  err:%s \n", serviceName, clusters, err.Error())
        return
    }
    if result == "" {
        log.Printf("[ERROR]:query list is empty!servieName:%s cluster:%s \n", serviceName, clusters)
        return
    }
    hr.ProcessServiceJson(result)
}
  • updateServiceNow方法通过hr.serviceProxy.QueryList(serviceName, clusters, hr.pushReceiver.port, false)获取json,然后通过hr.ProcessServiceJson(result)解析json

ProcessServiceJson

nacos-sdk-go-v0.3.2/clients/naming_client/host_reator.go

func (hr *HostReactor) ProcessServiceJson(result string) {
    service := utils.JsonToService(result)
    if service == nil {
        return
    }
    cacheKey := utils.GetServiceCacheKey(service.Name, service.Clusters)

    oldDomain, ok := hr.serviceInfoMap.Get(cacheKey)
    if ok && !hr.updateCacheWhenEmpty {
        //if instance list is empty,not to update cache
        if service.Hosts == nil || len(service.Hosts) == 0 {
            log.Printf("[ERROR]:do not have useful host, ignore it, name:%s \n", service.Name)
            return
        }
    }
    hr.updateTimeMap.Set(cacheKey, uint64(utils.CurrentMillis()))
    hr.serviceInfoMap.Set(cacheKey, *service)
    if !ok || ok && !reflect.DeepEqual(service.Hosts, oldDomain.(model.Service).Hosts) {
        if !ok {
            log.Println("[INFO] service not found in cache " + cacheKey)
        } else {
            log.Printf("[INFO] service key:%s was updated to:%s \n", cacheKey, utils.ToJsonString(service))
        }
        cache.WriteServicesToFile(*service, hr.cacheDir)
        hr.subCallback.ServiceChanged(service)
    }
}
  • ProcessServiceJson方法通过utils.JsonToService(result)将json解析为model.Service,然后通过utils.GetServiceCacheKey(service.Name, service.Clusters)构建cacheKey,之后更新hr.updateTimeMap、hr.serviceInfoMap;对于缓存不存在或者缓存存在变更的则执行cache.WriteServicesToFile(*service, hr.cacheDir),然后触发hr.subCallback.ServiceChanged(service)

小结

HostReactor定义了serviceInfoMap、cacheDir、updateThreadNum、serviceProxy、pushReceiver、subCallback、updateTimeMap、updateCacheWhenEmpty属性

doc

  • host_reator
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • HostReactor
  • NewHostReactor
  • loadCacheFromDisk
  • asyncUpdateService
  • updateServiceNow
  • ProcessServiceJson
  • 小结
  • doc
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档