前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊kingbus的command.go

聊聊kingbus的command.go

作者头像
code4it
发布2020-06-24 10:18:04
3030
发布2020-06-24 10:18:04
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下kingbus的command.go

Close

kingbus/mysql/command.go

//Close the Conn
func (c *Conn) Close() {
    if c.closed.Load() == true {
        return
    }
    c.closed.Store(true)
    c.Conn.Close()
    c.cancel()
    c.Conn = nil
}
  • Close方法执行c.closed.Store(true)、c.Conn.Close()、c.cancel()

kingbus/mysql/command.go

handleQuery

kingbus/mysql/command.go

func (c *Conn) handleQuery(sql string) (err error) {
    defer func() {
        if e := recover(); e != nil {
            if myerr, ok := e.(error); ok {
                const size = 4096
                buf := make([]byte, size)
                buf = buf[:runtime.Stack(buf, false)]
                log.Log.Errorf("Conn handleQuery error,err:%s,sql:%s,stack:%s", myerr, sql, string(buf))
                err = myerr
            }
            return
        }
    }()

    log.Log.Infof("handleQuery sql:%s", sql)
    sqlParser := parser.New()
    stmt, err := sqlParser.ParseOneStmt(sql, "", "")
    if err != nil {
        return err
    }
    switch v := stmt.(type) {
    case *ast.ShowStmt:
        return c.handleShow(v)
    case *ast.SelectStmt:
        return c.handleSelect(v)
    case *ast.SetStmt:
        return c.handleSet(v)
    case *ast.KillStmt:
        return c.handleKill(v)
    //todo get metrics of slave
    default:
        return ErrSQLNotSupport
    }
    return
}
  • handleQuery方法通过sqlParser来解析stmt,然后根据stmt的type来执行不同的方法;对于ShowStmt执行c.handleShow(v),对于SelectStmt执行c.handleSelect(v),对于SetStmt执行c.handleSet(v),对于KillStmt执行c.handleKill(v)

writeOK

kingbus/mysql/resp.go

func (c *Conn) writeOK(r *gomysql.Result) error {
    if r == nil {
        r = &gomysql.Result{}
    }

    r.Status |= c.status

    data := make([]byte, 4, 32)

    data = append(data, gomysql.OK_HEADER)

    data = append(data, gomysql.PutLengthEncodedInt(r.AffectedRows)...)
    data = append(data, gomysql.PutLengthEncodedInt(r.InsertId)...)

    if c.capability&gomysql.CLIENT_PROTOCOL_41 > 0 {
        data = append(data, byte(r.Status), byte(r.Status>>8))
        data = append(data, 0, 0)
    }

    return c.WritePacket(data)
}
  • writeOK方法用于响应ping命令

handleBinlogDumpGtid

kingbus/mysql/command.go

//todo kill the slave with same uuid
func (c *Conn) handleBinlogDumpGtid(ctx context.Context, data []byte) error {
    var (
        err             error
        heartbeatPeriod time.Duration
    )

    slaveGtidExecuted, slaveServerID, err := c.parseMysqlGtidDumpPacket(data)
    if err != nil {
        return err
    }

    //UnregisterSlave
    slaveUUID := c.userVariables[SlaveUUID].(string)
    defer c.binlogServer.UnregisterSlave(slaveUUID)

    err = c.binlogServer.CheckGtidSet(gomysql.MySQLFlavor, slaveGtidExecuted)
    if err != nil {
        log.Log.Errorf("CheckGtidSet error,err:%s,slaveGtids:%v", err, slaveGtidExecuted)
        return err
    }

    //get the previousGtidEvent raft index
    preGtidEventIndex, err := c.binlogServer.GetMySQLDumpAt(slaveGtidExecuted)
    if err != nil {
        log.Log.Errorf("GetMySQLDumpAt error,err:%s,slaveGtids:%v", err, slaveGtidExecuted)
        return err
    }

    fde, err := c.binlogServer.GetFde(preGtidEventIndex)
    if err != nil {
        log.Log.Errorf("handleBinlogDumpGtid:GetFde error,err:%s, gtidSet: %s,flavor:%s",
            err, slaveGtidExecuted.String(), gomysql.MySQLFlavor)
        return err
    }

    //1.send fake rotate event
    masterServerID := binary.LittleEndian.Uint32(fde[5:])
    fileName, err := c.binlogServer.GetNextBinlogFile(preGtidEventIndex)
    if err != nil {
        log.Log.Errorf("handleBinlogDumpGtid:GetNextBinlogFile error,err:%s, gtidSet: %s,flavor:%s",
            err, slaveGtidExecuted.String(), gomysql.MySQLFlavor)
        return err
    }
    err = c.sendFakeRotateEvent(masterServerID, fileName)
    if err != nil {
        log.Log.Errorf("handleBinlogDumpGtid:sendFakeRotateEvent error,err:%s, serverId: %d,fileName:%s",
            err, masterServerID, fileName)
        return err
    }

    //2.send fde
    err = c.sendFormatDescriptionEvent(fde)
    if err != nil {
        log.Log.Errorf("handleBinlogDumpGtid:sendFormatDescriptionEvent error,err:%s, fde:%v",
            err, fde)
        return err
    }

    //3.send event
    eventC := make(chan *storagepb.BinlogEvent, 2000)
    errorC := make(chan error, 1)
    err = c.binlogServer.DumpBinlogAt(ctx, preGtidEventIndex, slaveGtidExecuted, eventC, errorC)
    if err != nil {
        log.Log.Errorf("DumpBinlogAt error,err:%s,preGtidEventIndex:%d,slaveGtidExecuted:%v",
            err, preGtidEventIndex, slaveGtidExecuted)
        return err
    }

    //4.new metrics
    slaveEps := metrics.NewMeter()
    slaveThroughput := metrics.NewMeter()
    metrics.Register(fmt.Sprintf("slave_eps_%d", slaveServerID), slaveEps)
    metrics.Register(fmt.Sprintf("slave_thoughput_%d", slaveServerID), slaveThroughput)

    if period, ok := c.userVariables[MasterHeartbeatPeriod]; ok {
        heartbeatPeriod = time.Duration(period.(int64))
    } else {
        heartbeatPeriod = MaxHeartbeatPeriod
    }
    timer := time.NewTimer(heartbeatPeriod)
    for {
        select {
        case event := <-eventC:
            //event is not divided or the first divided event
            //WriteEvent need write a ok_header(one byte),after the header size
            if event.DividedCount == 0 || (0 < event.DividedCount && event.DividedSeqNum == 0) {
                err = c.WriteEvent(event.Data, true)
                if err != nil {
                    log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)
                    return err
                }
            } else {
                err = c.WriteEvent(event.Data, false)
                if err != nil {
                    log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)
                    return err
                }
                //event is divided,and the last packet size is MaxPayloadLen
                //need send a empty packet
                //https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html
                if event.DividedSeqNum == event.DividedCount-1 && len(event.Data) == MaxPayloadLen {
                    err = c.WriteEvent(nil, false)
                    if err != nil {
                        log.Log.Errorf("WriteEvent error,err:%s,event:%v", err, *event)
                        return err
                    }
                }
            }
            slaveEps.Mark(1)
            slaveThroughput.Mark(int64(len(event.Data)))
            //reset heartbeat period
            resetTime(timer, heartbeatPeriod)
        case err = <-errorC:
            log.Log.Errorf("binlog server DumpBinlogAt error,err:%s", err)
            return err
        case <-ctx.Done():
            log.Log.Errorf("handleBinlogDumpGtid:ctx done,quit")
            return ctx.Err()
        case <-timer.C:
            //kingbus send the heartbeat log event which received by syncer to slave
            log.Log.Debugf("send a heartbeat log event to slave")
            err = c.sendHeartbeatEvent(masterServerID)
            if err != nil {
                return err
            }
            //reset heartbeat period
            resetTime(timer, heartbeatPeriod)
        }
    }
    return nil
}
  • handleBinlogDumpGtid方法先执行c.sendFakeRotateEvent,再执行c.sendFormatDescriptionEvent,接着执行c.binlogServer.DumpBinlogAt,然后通过select来响应相应事件

handleRegisterSlave

kingbus/mysql/command.go

func (c *Conn) handleRegisterSlave(data []byte) error {
    var s Slave
    pos := 0

    s.ServerID = int32(binary.LittleEndian.Uint32(data[pos:]))
    pos += 4

    hostNameLen := int(data[pos])
    pos++

    s.HostName = string(data[pos : pos+hostNameLen])
    pos += hostNameLen

    userLen := int(data[pos])
    pos++

    s.User = string(data[pos : pos+userLen])
    pos += userLen

    passwordLen := int(data[pos])
    pos++

    s.Password = string(data[pos : pos+passwordLen])
    pos += passwordLen

    s.Port = int16(binary.LittleEndian.Uint16(data[pos:]))
    pos += 2

    s.Rank = binary.LittleEndian.Uint32(data[pos:])
    pos += 4

    s.MasterID = binary.LittleEndian.Uint32(data[pos:])
    s.State = REGISTERED

    //kill the zombie dump thread with same uuid
    c.killZombieDumpThreads()

    if uuid, ok := c.userVariables[SlaveUUID]; ok {
        s.UUID = uuid.(string)
    } else {
        s.UUID = ""
    }
    s.ConnectTime = time.Now()
    s.Conn = c
    err := c.binlogServer.RegisterSlave(&s)
    if err != nil {
        return err
    }
    log.Log.Infof("handleRegisterSlave:slave info:%v", s)
    return c.writeOK(nil)
}
  • handleRegisterSlave方法用于处理COM_REGISTER_SLAVE命令,这里先执行c.killZombieDumpThreads(),然后执行c.binlogServer.RegisterSlave(&s)

小结

kingbus的command.go提供了Close、handleQuery、writeOK、handleBinlogDumpGtid、handleRegisterSlave等方法

doc

  • command
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Close
  • handleQuery
  • writeOK
  • handleBinlogDumpGtid
  • handleRegisterSlave
  • 小结
  • doc
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档