前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊canal-go的SimpleCanalConnector

聊聊canal-go的SimpleCanalConnector

作者头像
code4it
发布2020-07-02 23:02:42
8020
发布2020-07-02 23:02:42
举报

本文主要研究一下canal-go的SimpleCanalConnector

SimpleCanalConnector

canal-go-v1.0.7/client/simple_canal_connector.go

type SimpleCanalConnector struct {
    Address           string
    Port              int
    UserName          string
    PassWord          string
    SoTime            int32
    IdleTimeOut       int32
    ClientIdentity    pb.ClientIdentity
    Connected         bool
    Running           bool
    Filter            string
    RollbackOnConnect bool
    LazyParseEntry    bool
}
  • SimpleCanalConnector定义了Address、Port、UserName、PassWord、SoTime、IdleTimeOut、ClientIdentity、Connected、Running、Filter、RollbackOnConnect、LazyParseEntry属性

NewSimpleCanalConnector

canal-go-v1.0.7/client/simple_canal_connector.go

//NewSimpleCanalConnector 创建SimpleCanalConnector实例
func NewSimpleCanalConnector(address string, port int, username string, password string, destination string, soTimeOut int32, idleTimeOut int32) *SimpleCanalConnector {
    s := &SimpleCanalConnector{
        Address:           address,
        Port:              port,
        UserName:          username,
        PassWord:          password,
        ClientIdentity:    pb.ClientIdentity{Destination: destination, ClientId: 1001},
        SoTime:            soTimeOut,
        IdleTimeOut:       idleTimeOut,
        RollbackOnConnect: true,
    }
    return s

}
  • NewSimpleCanalConnector方法创建了SimpleCanalConnector实例

Connect

canal-go-v1.0.7/client/simple_canal_connector.go

//Connect 连接Canal-server
func (c *SimpleCanalConnector) Connect() error {
    if c.Connected {
        return nil
    }

    if c.Running {
        return nil
    }

    err := c.doConnect()
    if err != nil {
        return err
    }
    if c.Filter != "" {
        c.Subscribe(c.Filter)
    }

    if c.RollbackOnConnect {
        c.waitClientRunning()

        c.RollBack(0)
    }

    c.Connected = true
    return nil

}
  • Connect方法主要执行c.doConnect()、c.Subscribe(c.Filter)方法,若RollbackOnConnect为true则再执行c.waitClientRunning()及c.RollBack(0)方法

DisConnection

canal-go-v1.0.7/client/simple_canal_connector.go

//DisConnection 关闭连接
func (c *SimpleCanalConnector) DisConnection() {
    if c.RollbackOnConnect && c.Connected == true {
        c.RollBack(0)
    }
    c.Connected = false
    quitelyClose()
}

//quitelyClose 优雅关闭
func quitelyClose() {
    if conn != nil {
        conn.Close()
    }
}
  • DisConnection方法主要是执行conn.Close()

doConnect

canal-go-v1.0.7/client/simple_canal_connector.go

//doConnect 去连接Canal-Server
func (c SimpleCanalConnector) doConnect() error {
    address := c.Address + ":" + fmt.Sprintf("%d", c.Port)
    con, err := net.Dial("tcp", address)
    if err != nil {
        return err
    }
    conn = con

    p := new(pb.Packet)
    data, err := readNextPacket()
    if err != nil {
        return err
    }
    err = proto.Unmarshal(data, p)
    if err != nil {
        return err
    }
    if p != nil {
        if p.GetVersion() != 1 {
            panic("unsupported version at this client.")
        }

        if p.GetType() != pb.PacketType_HANDSHAKE {
            panic("expect handshake but found other type.")
        }

        handshake := &pb.Handshake{}
        err = proto.Unmarshal(p.GetBody(), handshake)
        if err != nil {
            return err
        }
        pas := []byte(c.PassWord)
        ca := &pb.ClientAuth{
            Username:               c.UserName,
            Password:               pas,
            NetReadTimeoutPresent:  &pb.ClientAuth_NetReadTimeout{NetReadTimeout: c.IdleTimeOut},
            NetWriteTimeoutPresent: &pb.ClientAuth_NetWriteTimeout{NetWriteTimeout: c.IdleTimeOut},
        }
        caByteArray, _ := proto.Marshal(ca)
        packet := &pb.Packet{
            Type: pb.PacketType_CLIENTAUTHENTICATION,
            Body: caByteArray,
        }

        packArray, _ := proto.Marshal(packet)

        WriteWithHeader(packArray)

        pp, err := readNextPacket()
        if err != nil {
            return err
        }
        pk := &pb.Packet{}

        err = proto.Unmarshal(pp, pk)
        if err != nil {
            return err
        }

        if pk.Type != pb.PacketType_ACK {
            panic("unexpected packet type when ack is expected")
        }

        ackBody := &pb.Ack{}
        err = proto.Unmarshal(pk.GetBody(), ackBody)
        if err != nil {
            return err
        }
        if ackBody.GetErrorCode() > 0 {

            panic(errors.New(fmt.Sprintf("something goes wrong when doing authentication:%s", ackBody.GetErrorMessage())))
        }

        c.Connected = true

    }
    return nil

}
  • doConnect方法通过net.Dial("tcp", address)建立连接,然后通过readNextPacket读取data,然后通过proto.Unmarshal(data, p)来解析,之后发送PacketType_CLIENTAUTHENTICATION数据进行鉴权,若ack成功则设置c.Connected为true

GetWithOutAck

canal-go-v1.0.7/client/simple_canal_connector.go

//GetWithOutAck 获取数据不Ack
func (c *SimpleCanalConnector) GetWithOutAck(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) {
    c.waitClientRunning()
    if !c.Running {
        return nil, nil
    }
    var size int32

    if batchSize < 0 {
        size = 1000
    } else {
        size = batchSize
    }
    var time *int64
    var t int64
    t = -1
    if timeOut == nil {
        time = &t
    } else {
        time = timeOut
    }
    var i int32
    i = -1
    if units == nil {
        units = &i
    }
    get := new(pb.Get)
    get.AutoAckPresent = &pb.Get_AutoAck{AutoAck: false}
    get.Destination = c.ClientIdentity.Destination
    get.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
    get.FetchSize = size
    get.TimeoutPresent = &pb.Get_Timeout{Timeout: *time}
    get.UnitPresent = &pb.Get_Unit{Unit: *units}

    getBody, err := proto.Marshal(get)
    if err != nil {
        return nil, err
    }
    packet := new(pb.Packet)
    packet.Type = pb.PacketType_GET
    packet.Body = getBody
    pa, err := proto.Marshal(packet)
    if err != nil {
        return nil, err
    }
    WriteWithHeader(pa)
    message, err := c.receiveMessages()
    if err != nil {
        return nil, err
    }
    return message, nil
}
  • GetWithOutAck方法主要是执行WriteWithHeader(pa)以及c.receiveMessages()

Get

canal-go-v1.0.7/client/simple_canal_connector.go

//Get 获取数据并且Ack数据
func (c *SimpleCanalConnector) Get(batchSize int32, timeOut *int64, units *int32) (*pb.Message, error) {
    message, err := c.GetWithOutAck(batchSize, timeOut, units)
    if err != nil {
        return nil, err
    }
    err = c.Ack(message.Id)
    if err != nil {
        return nil, err
    }
    return message, nil
}
  • Get方法先执行c.GetWithOutAck(batchSize, timeOut, units),再执行c.Ack(message.Id)

Ack

canal-go-v1.0.7/client/simple_canal_connector.go

//Ack Ack Canal-server的数据(就是昨晚某些逻辑操作后删除canal-server端的数据)
func (c *SimpleCanalConnector) Ack(batchId int64) error {
    c.waitClientRunning()
    if !c.Running {
        return nil
    }

    ca := new(pb.ClientAck)
    ca.Destination = c.ClientIdentity.Destination
    ca.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
    ca.BatchId = batchId

    clientAck, err := proto.Marshal(ca)
    if err != nil {
        return err
    }
    pa := new(pb.Packet)
    pa.Type = pb.PacketType_CLIENTACK
    pa.Body = clientAck
    pack, err := proto.Marshal(pa)
    if err != nil {
        return err
    }
    WriteWithHeader(pack)
    return nil

}
  • Ack方法主要是发送pb.PacketType_CLIENTACK

Subscribe

canal-go-v1.0.7/client/simple_canal_connector.go

//Subscribe 订阅
func (c *SimpleCanalConnector) Subscribe(filter string) error {
    c.waitClientRunning()
    if !c.Running {
        return nil
    }
    body, _ := proto.Marshal(&pb.Sub{Destination: c.ClientIdentity.Destination, ClientId: strconv.Itoa(c.ClientIdentity.ClientId), Filter: filter})
    pack := new(pb.Packet)
    pack.Type = pb.PacketType_SUBSCRIPTION
    pack.Body = body

    packet, _ := proto.Marshal(pack)
    WriteWithHeader(packet)

    p := new(pb.Packet)

    paBytes, err := readNextPacket()
    if err != nil {
        return err
    }
    err = proto.Unmarshal(paBytes, p)
    if err != nil {
        return err
    }
    ack := new(pb.Ack)
    err = proto.Unmarshal(p.Body, ack)
    if err != nil {
        return err
    }

    if ack.GetErrorCode() > 0 {
        return fmt.Errorf("failed to subscribe with reason::%s", ack.GetErrorMessage())
    }

    c.Filter = filter

    return nil
}
  • Subscribe方法主要是发送pb.PacketType_SUBSCRIPTION

UnSubscribe

canal-go-v1.0.7/client/simple_canal_connector.go

//UnSubscribe 取消订阅
func (c *SimpleCanalConnector) UnSubscribe() error {
    c.waitClientRunning()
    if c.Running {
        return nil
    }

    us := new(pb.Unsub)
    us.Destination = c.ClientIdentity.Destination
    us.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)

    unSub, err := proto.Marshal(us)
    if err != nil {
        return err
    }

    pa := new(pb.Packet)
    pa.Type = pb.PacketType_UNSUBSCRIPTION
    pa.Body = unSub

    pack, err := proto.Marshal(pa)
    WriteWithHeader(pack)

    p, err := readNextPacket()
    if err != nil {
        return err
    }
    pa = nil
    err = proto.Unmarshal(p, pa)
    if err != nil {
        return err
    }
    ack := new(pb.Ack)
    err = proto.Unmarshal(pa.Body, ack)
    if err != nil {
        return err
    }
    if ack.GetErrorCode() > 0 {
        panic(errors.New(fmt.Sprintf("failed to unSubscribe with reason:%s", ack.GetErrorMessage())))
    }
    return nil
}
  • UnSubscribe方法主要是发送pb.PacketType_UNSUBSCRIPTION

RollBack

canal-go-v1.0.7/client/simple_canal_connector.go

//RollBack 回滚操作
func (c *SimpleCanalConnector) RollBack(batchId int64) error {
    c.waitClientRunning()
    cb := new(pb.ClientRollback)
    cb.Destination = c.ClientIdentity.Destination
    cb.ClientId = strconv.Itoa(c.ClientIdentity.ClientId)
    cb.BatchId = batchId

    clientBollBack, err := proto.Marshal(cb)
    if err != nil {
        return err
    }

    pa := new(pb.Packet)
    pa.Type = pb.PacketType_CLIENTROLLBACK
    pa.Body = clientBollBack
    pack, err := proto.Marshal(pa)
    if err != nil {
        return err
    }
    WriteWithHeader(pack)
    return nil
}
  • RollBack方法主要是发送pb.PacketType_CLIENTROLLBACK

小结

SimpleCanalConnector定义了Address、Port、UserName、PassWord、SoTime、IdleTimeOut、ClientIdentity、Connected、Running、Filter、RollbackOnConnect、LazyParseEntry属性;它提供了Connect、DisConnection、GetWithOutAck、Get、Ack、Subscribe、UnSubscribe、RollBack方法

doc

  • simple_canal_connector
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SimpleCanalConnector
  • NewSimpleCanalConnector
  • Connect
  • DisConnection
  • doConnect
  • GetWithOutAck
  • Get
  • Ack
  • Subscribe
  • UnSubscribe
  • RollBack
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档