前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊kingbus的DumpBinlogAt

聊聊kingbus的DumpBinlogAt

作者头像
code4it
发布2020-06-24 10:18:43
3080
发布2020-06-24 10:18:43
举报

本文主要研究一下kingbus的DumpBinlogAt

DumpBinlogAt

kingbus/server/binlog_server.go

//DumpBinlogAt implements dump binlog event by slave executed gtid set
func (s *BinlogServer) DumpBinlogAt(ctx context.Context,
    startRaftIndex uint64, slaveGtids *gomysql.MysqlGTIDSet,
    eventC chan<- *storagepb.BinlogEvent, errorC chan<- error) error {
    var inExcludeGroup = false

    //new a binlog event reader from startRaftIndex, then send event to slave one by one
    reader, err := s.store.NewEntryReaderAt(startRaftIndex)
    if err != nil {
        log.Log.Errorf("NewEntryReaderAt error,err:%s,raftIndex:%d", err, startRaftIndex)
        return err
    }

    nextRaftIndex := reader.NextRaftIndex()
    log.Log.Infof("DumpBinlogAt:raftIndex:%d,slaveGtids:%s", nextRaftIndex, slaveGtids.String())
    go func() {
        for {
            //the next read raftIndex must be little than AppliedIndex
            if nextRaftIndex <= s.kingbusInfo.AppliedIndex() {
                raftEntry, err := reader.GetNext()
                if err != nil {
                    log.Log.Errorf("reader.GetNext error,err:%s,nextRaftIndex:%d,AppliedIndex:%d",
                        err, nextRaftIndex, s.kingbusInfo.AppliedIndex())
                    select {
                    case errorC <- err:
                    default:
                    }
                    return //need quit
                }
                nextRaftIndex = reader.NextRaftIndex()

                //this entry is not binlog event
                if utils.IsBinlogEvent(raftEntry) == false {
                    continue
                }
                event := utils.DecodeBinlogEvent(raftEntry)
                //filter the event in slave gtids,if the event has send to slave
                inExcludeGroup = s.skipEvent(event, slaveGtids, inExcludeGroup)
                if inExcludeGroup {
                    continue
                }

                select {
                case eventC <- event:
                case <-ctx.Done():
                    log.Log.Errorf("binlog server receive cancel, need quit,err:%s", ctx.Err())
                    select {
                    case errorC <- ctx.Err():
                    default:
                    }
                    return //need quit
                }
            } else {
                select {
                case <-s.broadcast.Receive():
                    break
                case <-ctx.Done():
                    log.Log.Errorf("binlog server receive cancel, need quit,err:%s", ctx.Err())
                    select {
                    case errorC <- ctx.Err():
                    default:
                    }
                    return //need quit
                }
            }
        }
    }()

    return nil
}
  • DumpBinlogAt方法通过s.store.NewEntryReaderAt(startRaftIndex)获取reader,然后获取nextRaftIndex,之后通过utils.DecodeBinlogEvent(raftEntry)获取event,然后通过s.skipEvent(event, slaveGtids, inExcludeGroup)来判断是否该skip,之后将event写入到eventC

NewEntryReaderAt

kingbus/storage/disk_storage.go

//NewEntryReaderAt create a DiskEntryReader at raftIndex
func (s *DiskStorage) NewEntryReaderAt(raftIndex uint64) (EntryReader, error) {
    err := s.checkRaftIndex(raftIndex)
    if err != nil {
        log.Log.Errorf("checkRaftIndex error,err:%s,raftIndex:%d", err, raftIndex)
        return nil, err
    }

    reader := new(DiskEntryReader)
    reader.indexReadAt = raftIndex
    reader.store = s

    return reader, nil
}
  • NewEntryReaderAt方法先通过s.checkRaftIndex(raftIndex)校验一下index,然后创建DiskEntryReader,设置indexReadAt为raftIndex

skipEvent

kingbus/server/binlog_server.go

//skipEvent filter the event has been executed by slave
func (s *BinlogServer) skipEvent(event *storagepb.BinlogEvent, slaveGtids *gomysql.MysqlGTIDSet, inExcludeGroup bool) bool {
    switch replication.EventType(event.Type) {
    case replication.GTID_EVENT:
        //remove header
        eventBody := event.Data[replication.EventHeaderSize:]
        //remove crc32
        eventBody = eventBody[:len(eventBody)-replication.BinlogChecksumLength]

        gtidEvent := &replication.GTIDEvent{}
        if err := gtidEvent.Decode(eventBody); err != nil {
            log.Log.Errorf("Decode gtid event error,err:%s", err)
            return true
        }
        u, err := uuid.FromBytes(gtidEvent.SID)
        if err != nil {
            log.Log.Errorf("FromBytes error,err:%s,sid:%v", err, gtidEvent.SID)
            return true
        }
        gtidStr := fmt.Sprintf("%s:%d", u.String(), gtidEvent.GNO)
        currentGtidset, err := gomysql.ParseMysqlGTIDSet(gtidStr)
        if err != nil {
            log.Log.Errorf("ParseMysqlGTIDSet error,err:%s,gtid:%s", err, gtidStr)
            return true
        }
        return slaveGtids.Contain(currentGtidset)
    case replication.ROTATE_EVENT:
        return false
    }
    return inExcludeGroup
}
  • skipEvent方法根据replication.EventType(event.Type)类型来判断,对于能够取到currentGtidset的通过slaveGtids.Contain(currentGtidset)判断,对于replication.ROTATE_EVENT返回false

小结

DumpBinlogAt方法通过s.store.NewEntryReaderAt(startRaftIndex)获取reader,然后获取nextRaftIndex,之后通过utils.DecodeBinlogEvent(raftEntry)获取event,然后通过s.skipEvent(event, slaveGtids, inExcludeGroup)来判断是否该skip,之后将event写入到eventC

doc

  • binlog_server
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • DumpBinlogAt
  • NewEntryReaderAt
  • skipEvent
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档