前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊kingbus的binlog_progress.go

聊聊kingbus的binlog_progress.go

原创
作者头像
code4it
修改2020-06-23 10:25:18
3650
修改2020-06-23 10:25:18
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下kingbus的binlog_progress.go

BinlogProgress

kingbus/server/binlog_progress.go

代码语言:javascript
复制
//BinlogProgress is the progress of receiving binlog
type BinlogProgress struct {
    currentGtid  *atomic.String
    lastSaveGtid string
    //for heartbeat event
    lastBinlogFile     *atomic.String
    lastFilePosition   *atomic.Uint32
    executedGtidSetStr *atomic.String
​
    trxBoundaryParser *mysql.TransactionBoundaryParser
​
    persistentTime         time.Time
    persistentAppliedIndex uint64
    executedGtidSet        gomysql.GTIDSet
    store                  storage.Storage
}
  • BinlogProgress定义了currentGtid、lastSaveGtid、lastBinlogFile、lastFilePosition、executedGtidSetStr、trxBoundaryParser、persistentTime、persistentAppliedIndex、executedGtidSet、store属性

newBinlogProgress

kingbus/server/binlog_progress.go

代码语言:javascript
复制
func newBinlogProgress(store storage.Storage) (*BinlogProgress, error) {
    var err error
    p := new(BinlogProgress)
​
    p.trxBoundaryParser = new(mysql.TransactionBoundaryParser)
    p.trxBoundaryParser.Reset()
​
    p.currentGtid = atomic.NewString("")
    p.lastBinlogFile = atomic.NewString("")
    p.lastFilePosition = atomic.NewUint32(0)
​
    p.persistentAppliedIndex = 0
    p.persistentTime = time.Unix(0, 0)
​
    //get executed gtid_set
    //This value may be old, but resetBinlogProgress will update it to the latest
    p.executedGtidSet, err = store.GetGtidSet(gomysql.MySQLFlavor, storage.ExecutedGtidSetKey)
    if err != nil {
        log.Log.Errorf("newBinlogProgress:get executedGtidSet error,err:%s", err)
        return nil, err
    }
    p.executedGtidSetStr = atomic.NewString(p.executedGtidSet.String())
    p.store = store
​
    return p, nil
}
  • newBinlogProgress方法创建了BinlogProgress及mysql.TransactionBoundaryParser,之后通过store.GetGtidSet(gomysql.MySQLFlavor, storage.ExecutedGtidSetKey)获取executedGtidSet

updateProcess

kingbus/server/binlog_progress.go

代码语言:javascript
复制
//updateProcess update and save executedGtid set
func (s *BinlogProgress) updateProcess(raftIndex uint64, eventRawData []byte) error {
    var err error
​
    //parse event header
    h := new(replication.EventHeader)
    err = h.Decode(eventRawData)
    if err != nil {
        log.Log.Errorf("Decode error,err:%s,buf:%v", err, eventRawData)
        return err
    }
    //set the heartbeat info
    s.lastFilePosition.Store(h.LogPos)
​
    //remove header
    eventRawData = eventRawData[replication.EventHeaderSize:]
    eventLen := int(h.EventSize) - replication.EventHeaderSize
    if len(eventRawData) != eventLen {
        return fmt.Errorf("invalid data size %d in event %s, less event length %d",
            len(eventRawData), h.EventType, eventLen)
    }
    //remove crc32
    eventRawData = eventRawData[:len(eventRawData)-replication.BinlogChecksumLength]
​
    //the eventRawData maybe the first divided packet, but must not be query event
    //so don't worry
    eventBoundaryType, err := s.trxBoundaryParser.GetEventBoundaryType(h, eventRawData)
    if err != nil {
        log.Log.Errorf("GetEventBoundaryType error,err:%s,header:%v",
            err, *h)
        return err
    }
    //ignore updateState error, maybe a partial trx
    err = s.trxBoundaryParser.UpdateState(eventBoundaryType)
    if err != nil {
        log.Log.Warnf("trxBoundaryParser UpdateState error,err:%s,header:%v", err, *h)
        s.trxBoundaryParser.Reset()
        s.currentGtid.Store("")
        return nil
    }
​
    currentGtidStr := s.currentGtid.Load()
    if s.trxBoundaryParser.IsNotInsideTransaction() &&
        len(currentGtidStr) != 0 && s.lastSaveGtid != currentGtidStr {
​
        log.Log.Debugf("current gtid is :%s,add into executedGtidSet:%s",
            currentGtidStr, s.executedGtidSet.String())
        //update executedGtidSet
        err = s.executedGtidSet.Update(currentGtidStr)
        if err != nil {
            return err
        }
        s.lastSaveGtid = currentGtidStr
        s.executedGtidSetStr.Store(s.executedGtidSet.String())
​
        //save the raftIndex and executedGtidSet at the same time
        if raftIndex-s.persistentAppliedIndex > persistentCount ||
            time.Now().Sub(s.persistentTime) > persistentTimeInterval {
            err = s.store.SetBinlogProgress(raftIndex, s.executedGtidSet)
            if err != nil {
                log.Log.Errorf("SetGtidSet error,err:%s,key:%s,value:%s",
                    err, storage.ExecutedGtidSetKey, s.executedGtidSet.String())
                return err
            }
​
            s.persistentAppliedIndex = raftIndex
            s.persistentTime = time.Now()
        }
    }
    return nil
}
  • updateProcess方法会解析eventRawData为replication.EventHeader,然后存储h.LogPos;之后通过s.trxBoundaryParser.GetEventBoundaryType(h, eventRawData)获取eventBoundaryType,然后通过s.trxBoundaryParser.UpdateState(eventBoundaryType)更新;之后通过s.executedGtidSet.Update(currentGtidStr)更新currentGtidStr;最后通过s.store.SetBinlogProgress(raftIndex, s.executedGtidSet)更新binlogProgress

小结

kingbus的binlog_progress.go提供了newBinlogProgress、updateProcess方法用于存储binglogProgress

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • BinlogProgress
  • newBinlogProgress
  • updateProcess
  • 小结
  • doc
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档