前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊kingbus的binlog_server_handler.go

聊聊kingbus的binlog_server_handler.go

原创
作者头像
code4it
修改2020-06-19 11:50:59
3180
修改2020-06-19 11:50:59
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下kingbus的binlog_server_handler.go

StartBinlogServer

kingbus/api/binlog_server_handler.go

代码语言:javascript
复制
//StartBinlogServer implements start a binlog server
func (h *BinlogServerHandler) StartBinlogServer(echoCtx echo.Context) error {
    h.l.Lock()
    defer h.l.Unlock()
​
    var args config.BinlogServerConfig
    var err error
​
    defer func() {
        if err != nil {
            log.Log.Errorf("StartBinlogServer error,err: %s", err)
            echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError(err.Error()))
        }
    }()
​
    err = echoCtx.Bind(&args)
    if err != nil {
        return err
    }
    kingbusIP := h.svr.GetIP()
    //check args
    err = args.Check(kingbusIP)
    if err != nil {
        return err
    }
    //start syncer server
    err = h.svr.StartServer(config.BinlogServerType, &args)
    if err != nil {
        log.Log.Errorf("start server error,err:%s,args:%v", err, args)
        return err
    }
​
    return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))
}
  • StartBinlogServer方法主要是通过h.svr.StartServer(config.BinlogServerType, &args)来启动binlog server

StartServer

kingbus/server/server.go

代码语言:javascript
复制
//StartServer start sub servers:syncer server or binlog master server
func (s *KingbusServer) StartServer(svrType config.SubServerType, args interface{}) error {
    var err error
    switch svrType {
    case config.SyncerServerType:
        if s.IsSyncerStarted() {
            return ErrStarted
        }
        syncerArgs, ok := args.(*config.SyncerArgs)
        if !ok {
            log.Log.Errorf("StartServer args is illegal,args:%v", args)
            return ErrArgs
        }
        err = s.startSyncerServer(syncerArgs)
        if err != nil {
            log.Log.Errorf("startSyncerServer error,err:%s,args:%v", err, *syncerArgs)
            return ErrArgs
        }
        //start to propose binlog event to raft cluster
        s.StartProposeBinlog(s.syncer.ctx)
        log.Log.Debugf("start syncer,and propose!!!")
        return nil
    case config.BinlogServerType:
        if s.IsBinlogServerStarted() {
            return ErrStarted
        }
        masterArgs, ok := args.(*config.BinlogServerConfig)
        if !ok {
            log.Log.Errorf("StartServer args is illegal,args:%v", args)
            return ErrArgs
        }
        err = s.startMasterServer(masterArgs)
        if err != nil {
            log.Log.Errorf("startMasterServer error,err:%s,args:%v", err, *masterArgs)
            return ErrArgs
        }
        return nil
    default:
        log.Log.Fatalf("StartServer:server type not support,serverType:%v", svrType)
    }
    return nil
}
  • StartServer方法根据svrType参数来启动不同的server,若是config.SyncerServerType则启动的是SyncerServer,若是config.BinlogServerType则启动的是BinlogServer

StopBinlogServer

kingbus/api/binlog_server_handler.go

代码语言:javascript
复制
//StopBinlogServer implements stop binlog server
func (h *BinlogServerHandler) StopBinlogServer(echoCtx echo.Context) error {
    h.l.Lock()
    defer h.l.Unlock()
    h.svr.StopServer(config.BinlogServerType)
    return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(""))
}
  • StopBinlogServer方法通过h.svr.StopServer(config.BinlogServerType)来关闭binlog server

StopServer

kingbus/server/server.go

代码语言:javascript
复制
//StopServer stop sub server
func (s *KingbusServer) StopServer(svrType config.SubServerType) {
    switch svrType {
    case config.SyncerServerType:
        if s.IsSyncerStarted() {
            s.syncer.Stop()
        }
    case config.BinlogServerType:
        if s.IsBinlogServerStarted() {
            s.master.Stop()
        }
    default:
        log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType)
    }
}
  • StopServer方法根据svrType参数来关闭不同的server,若是config.SyncerServerType则关闭的是SyncerServer,若是config.BinlogServerType则关闭的是BinlogServer

GetBinlogServerStatus

kingbus/api/binlog_server_handler.go

代码语言:javascript
复制
//GetBinlogServerStatus implements get binlog server status in the runtime state
func (h *BinlogServerHandler) GetBinlogServerStatus(echoCtx echo.Context) error {
    h.l.Lock()
    defer h.l.Unlock()
​
    status := h.svr.GetServerStatus(config.BinlogServerType)
    if masterStatus, ok := status.(*config.BinlogServerStatus); ok {
        return echoCtx.JSON(http.StatusOK, utils.NewResp().SetData(masterStatus))
    }
    return echoCtx.JSON(http.StatusInternalServerError, utils.NewResp().SetError("no resp"))
}
  • GetBinlogServerStatus方法通过h.svr.GetServerStatus(config.BinlogServerType)来获取server status

GetServerStatus

代码语言:javascript
复制
//GetServerStatus get the sub server status
func (s *KingbusServer) GetServerStatus(svrType config.SubServerType) interface{} {
    switch svrType {
    case config.SyncerServerType:
        var syncerStatus config.SyncerStatus
        if s.IsSyncerStarted() {
            cfg := s.syncer.cfg
            syncerStatus.MysqlAddr = fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
            syncerStatus.MysqlUser = cfg.User
            syncerStatus.MysqlPassword = cfg.Password
            syncerStatus.SemiSync = cfg.SemiSyncEnabled
​
            syncerStatus.Status = config.ServerRunningStatus
            syncerStatus.CurrentGtid = s.CurrentGtidStr()
            syncerStatus.LastBinlogFile = s.LastBinlogFile()
            syncerStatus.LastFilePosition = s.LastFilePosition()
            syncerStatus.ExecutedGtidSet = s.ExecutedGtidSetStr()
​
            purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey)
            if err != nil {
                log.Log.Fatalf("get PurgedGtidSet error,err:%s", err)
            }
            syncerStatus.PurgedGtidSet = purgedGtids.String()
        } else {
            syncerStatus.Status = config.ServerStoppedStatus
        }
        return &syncerStatus
    case config.BinlogServerType:
        var status config.BinlogServerStatus
        if s.IsBinlogServerStarted() {
            cfg := s.master.cfg
            status.Addr = cfg.Addr
            status.User = cfg.User
            status.Password = cfg.Password
​
            status.Slaves = make([]*mysql.Slave, 0, 2)
            slaves := s.master.GetSlaves()
            for _, s := range slaves {
                status.Slaves = append(status.Slaves, s)
            }
            status.CurrentGtid = s.CurrentGtidStr()
            status.LastBinlogFile = s.LastBinlogFile()
            status.LastFilePosition = s.LastFilePosition()
            status.ExecutedGtidSet = s.ExecutedGtidSetStr()
​
            purgedGtids, err := s.store.GetGtidSet("mysql", storage.GtidPurgedKey)
            if err != nil {
                log.Log.Fatalf("get PurgedGtidSet error,err:%s", err)
            }
            status.PurgedGtidSet = purgedGtids.String()
            status.Status = config.ServerRunningStatus
        } else {
            status.Status = config.ServerStoppedStatus
        }
        return &status
    default:
        log.Log.Fatalf("StopServer:server type not support,serverType:%v", svrType)
    }
    return nil
}
  • GetServerStatus方法根据svrType参数来获取不同的status,若是config.SyncerServerType则获取的是SyncerStatus,若是config.BinlogServerType则获取的是BinlogServerStatus

小结

kingbus的binlog_server_handler提供了StartBinlogServer、StopBinlogServer、GetBinlogServerStatus,他们均委托给了server.go的对应方法

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • StartBinlogServer
    • StartServer
    • StopBinlogServer
      • StopServer
      • GetBinlogServerStatus
        • GetServerStatus
        • 小结
        • doc
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档