前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊dbsync的jobs

聊聊dbsync的jobs

作者头像
code4it
发布2021-04-29 17:19:23
4210
发布2021-04-29 17:19:23
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下dbsync的jobs

Job

代码语言:javascript
复制
//Job represents db sync job
type Job struct {
    ID        string
    Error     string
    Status    string
    Progress  Progress
    Items     []*Transferable
    Chunked   bool
    mutex     *sync.Mutex
    StartTime time.Time
    EndTime   *time.Time
}

//NewJob creates a new job
func NewJob(id string) *Job {
    return &Job{
        ID:        id,
        StartTime: time.Now(),
        mutex:     &sync.Mutex{},
        Items:     make([]*Transferable, 0),
    }
}

Job方法定义了ID、Error、Status、Progress、Items、Chunked、mutex、StartTime、EndTime

Update

代码语言:javascript
复制
//Update updates job progress
func (j *Job) Update() {
    if len(j.Items) == 0 {
        return
    }
    sourceCount := 0
    destCount := 0
    transferred := 0
    for i := range j.Items {
        if j.Items[i].Status == nil {
            continue
        }
        sourceCount += j.Items[i].Source.Count()
        destCount += j.Items[i].Dest.Count()
        transferred += int(atomic.LoadUint32(&j.Items[i].Transferred))

    }
    j.Progress.Transferred = transferred
    j.Progress.SourceCount = sourceCount
    j.Progress.DestCount = destCount
    if sourceCount > 0 {
        j.Progress.Pct = transferred / sourceCount
    }
}

Update方法遍历Items,统计transferred、sourceCount、destCount

Done

代码语言:javascript
复制
//Done flag job as done
func (j *Job) Done(now time.Time) {
    if j.Status != shared.StatusError {
        j.Status = shared.StatusDone
    }
    j.EndTime = &now
}

Done方法更新Status和EndTime

Add

代码语言:javascript
复制
//Add add transferable
func (j *Job) Add(transferable *Transferable) {
    j.mutex.Lock()
    defer j.mutex.Unlock()
    j.Items = append(j.Items, transferable)
}

Add方法往transferable添加Items

IsRunning

代码语言:javascript
复制
//IsRunning returns true if jos has running status
func (j *Job) IsRunning() bool {
    return j.Status == shared.StatusRunning || j.EndTime == nil
}

IsRunning方法通过status和EndTime来判断是否是running

Service

代码语言:javascript
复制
//Service represents a job service
type Service interface {
    //List lists all active or recently active jobs
    List(request *ListRequest) *ListResponse
    //Create creates a new job
    Create(ID string) *core.Job
    //Get returns a job for supplied ID or nil
    Get(ID string) *core.Job
}

type service struct {
    registry *registry
}

//New create a job service
func New() Service {
    return &service{
        registry: newRegistry(),
    }
}

Service接口定义了List、Create、Get

Get

代码语言:javascript
复制
//Get returns job by ID or nil
func (s *service) Get(ID string) *core.Job {
    jobs := s.registry.list()
    for i := range jobs {
        if jobs[i].ID == ID {
            jobs[i].Update()
            return jobs[i]
        }
    }
    return nil

}

Get方法先执行registry.list(),然后遍历list找到ID对应的job,然后执行Update

List

代码语言:javascript
复制
//List lists all jobs
func (s *service) List(request *ListRequest) *ListResponse {
    jobs := s.registry.list()
    if len(request.IDs) == 0 {
        return &ListResponse{
            Jobs: jobs,
        }
    }
    var requestedIDs = make(map[string]bool)
    for i := range request.IDs {
        requestedIDs[request.IDs[i]] = true
    }
    var filtered = make([]*core.Job, 0)
    for i := range jobs {
        if _, has := requestedIDs[jobs[i].ID]; !has {
            continue
        }
        jobs[i].Update()
        filtered = append(filtered, jobs[i])
    }
    return &ListResponse{
        Jobs: filtered,
    }
}

List方法先执行registry.list(),之后根据requestedIDs找出对应的job,执行Update,最后返回

Create

代码语言:javascript
复制
//Create creates a new job
func (s *service) Create(ID string) *core.Job {
    job := core.NewJob(ID)
    s.registry.add(job)
    return job
}

Create方法通过core.NewJob(ID)创建job,然后执行registry.add(job)

小结

dbsync的Schedulable定义了URL、ID、*contract.Sync、Schedule、Status、status属性,它提供了Clone、Done、IsRunning、ScheduleNexRun、Init、Validate方法。

doc

  • dbsync
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Job
    • Update
      • Done
        • Add
          • IsRunning
          • Service
          • Get
          • List
          • Create
          • 小结
          • doc
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档