前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MySQL Redis一致性方案 推荐go-mysql/canal

MySQL Redis一致性方案 推荐go-mysql/canal

原创
作者头像
IT工作者
发布2022-07-22 10:23:08
1.5K0
发布2022-07-22 10:23:08
举报
文章被收录于专栏:程序技术知识

在项目初期时, 为解决Mysql和Redis的数据同步问题, 选择了阿里开源的Canal方案, 然后压力测试时, 解决了Redis并发写入的问题后, 随之而来的瓶颈出现在了Canal client消费速度, 本机测试大约1.3w/s, 不光如此, 使用阿里的Canal还存在一些弊端:

外部程序, 资料不够详尽, 社区交流效率低下, JAVA源码(我不熟悉)且臃肿(大致瞥了一眼)等等因素, 导致遇到问题不好解决, 目前开发测试阶段遇到问题也没有得到即时答复.排查问题困难重重.

然后经过一番思索和查阅, 找到了一个比较好的解决办法, golang实现, 同步速度大约6.2w/s, 与阿里canal官方公布的10w/s还差点距离,但是比我实际使用canal的1.3w/s提升太多了,然后这里我主要是够用了, 就没继续优化了

 当然还是得感谢MysqlToAll这个开源项目, 我的快速实现时站在他的肩膀上的.在搜索到这个MysqlToAll之前,我也是有思考过自己去实现过类似阿里Canal这种模拟Mysql从库拉取binlog数据的方案, 但是不知道有什么库可以用, 然后就看到了MysqlToAll使用的mysql官方canal库:

github.com/go-mysql-org/go-mysql/canal

好东西啊!! 使用golang的同学完全可以抛弃阿里的canal了

引用官方文档介绍一下:

// Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...

// MySQL must open row format for binlog

摘录一段核心代码:

代码语言:javascript
复制
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
    ev := e.Event.(*replication.RowsEvent)
 
    // Caveat: table may be altered at runtime.
    schema := string(ev.Table.Schema)
    table := string(ev.Table.Table)
 
    t, err := c.GetTable(schema, table)
    if err != nil {
        return er
    }
    var action string
    switch e.Header.EventType {
    case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
        action = InsertAction
    case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
        action = DeleteAction
    case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
        action = UpdateAction
    default:
        return errors.Errorf("%s not supported now", e.Header.EventType)
    }
    events := newRowsEvent(t, action, ev.Rows, e.Header)
    return c.eventHandler.OnRow(events)
}

这是Canal里每个连接接收到数据之后按照类型 处理row类型事件的接口, 经过加工之后再交给最终的handler(需要自己实现). 例如MysqlToAll的实现:

代码语言:javascript
复制
func (this *CommonEventHandler) OnRow(e *canal.RowsEvent) error {
    log.Debug("OnRow, action:%s row's count:%d", e.Action, len(e.Rows))
    entity := &common.RawLogEntity{}
    entity.Action = e.Action
    entity.Rows = e.Rows
    entity.TableName = e.Table.Name
    entity.Header = []string{}
    entity.HeaderMap = map[string]int{}
    entity.ValueMap = map[string]interface{}{}
 
    for columnIndex, currColumn := range e.Table.Columns {
        entity.Header = append(entity.Header, currColumn.Name)
        entity.HeaderMap[currColumn.Name] = columnIndex
        entity.ValueMap[currColumn.Name] = e.Rows[len(e.Rows)-1][columnIndex]
    }
    this.CurrOutput.Write(entity)
 
    return nil
}

主要是OnRow,其他接口就不再赘述了.

这里强调一下, 关于上面这个OnRow里接受到rows条数,我实测会出现如下结果:

代码语言:javascript
复制
 [debug  ] OnRow, action:update row's count:2980
 [debug  ] OnRow, action:update row's count:2982
 [debug  ] OnRow, action:update row's count:2118
 [debug  ] OnRow, action:insert row's count:1
 [debug  ] OnRow, action:insert row's count:1
 [debug  ] OnRow, action:update row's count:2
 [debug  ] OnRow, action:insert row's count:1

很清楚了吧..然后附上库里RowsEvent 的定义和注释会更明了:

代码语言:javascript
复制
// RowsEvent is the event for row replication.
type RowsEvent struct {
    Table  *schema.Table
    Action string
    // changed row list
    // binlog has three update event version, v0, v1 and v2.
    // for v1 and v2, the rows number must be even.
    // Two rows for one event, format is [before update row, after update row]
    // for update v0, only one row for a event, and we don't support this version.
    Rows [][]interface{}
    // Header can be used to inspect the event
    Header *replication.EventHeade
}

最后说一下MysqlToAll的一些不足之处, 也是我为什么没用它的原因:

1. rowdata和posdata没分离channal, 我分离后速度从6w/s提升到6.2w/s

2.OnRow回调接受到的rows并不只有一条数据, 他的使用方式会丢失数据

3.有很多为解决通用性而写的代码,会些许影响整体效率

补充:这里有一个开源的基于go-mysql/canal 的库wj596/go-mysql-transfe

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档