前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >记录一次MySQL大表拆分和迁移

记录一次MySQL大表拆分和迁移

作者头像
素履coder
发布2022-10-28 10:42:50
1.3K0
发布2022-10-28 10:42:50
举报
文章被收录于专栏:素履coder素履coder

1. 背景#

最近遇到一个关于MySQL单表过大的问题,该表存放的主要是日志文件,且其中有一个字段存放的数据过大,导致占用空间过大以及查询效率的降低,这种设计其实是不合理的。目前该表占用1.2T容量,数据量超过3亿条,而这个RDS数据库的容量总共就2T,且由于种种原因无法扩容,迫不得已急需给出解决方案。

2. 解决方案#

根据上面的背景,可得出以下这些问题,也给出了解决方案:

问题

解决方法

1

某字段占用空间较大,在MySQL中为text类型,存储的是json格式的数据,该字段平均占用空间为5KB

对字段进行压缩,把json格式压缩成字节序列,压缩后可节省5倍空间左右

2

单表数据量过大,而我们的业务是基本只取本年的数据,该表中很多不使用的数据导致查询效率降低

对该表按年份分表,本年的数据为热数据,之前的数据为冷数据

3

RDS服务器容量不足且无法扩容

考虑到以后业务数据的增长,我们决定直接买另一台RDS服务器,把冷数据迁移到新RDS服务器

具体步骤:在原先的数据库批量压缩字段 —> 批量迁移数据到新数据库

2.1 压缩代码#

由于数据库一直在业务上被使用着,无法停下来专门给我们做这些处理,那么为了降低对业务的影响,我们只能选择在节假日或者晚上凌晨时候操作,因此所有的脚本文件都需要提前写好,到时候直接做批量处理。

现在先给出压缩与解压代码如下,把json的数据压缩成字节格式,然后采用Base64编码格式存储:

代码语言:javascript
复制
func Compress(s string, jsonFlag bool) string {
	// 开启 了 json 判断 并且 传入的 值 并非 是 有效的 json,直接返回原来的值
	if jsonFlag && !json.Valid([]byte(s)) || len(s) == 0 {
		return s
	}
	data := []byte(s)
	// 压缩
	data = zipBytes(data)
	// 转base64 存储
	return setBase64(data)
}

func UnCompress(s string, jsonFlag bool) string {
	// 开启 了 json 验证 并且 是有效的 json ,则 直接返回 原来的值
	if jsonFlag && json.Valid([]byte(s)) || len(s) == 0 {
		return s
	}
	data, err := getBase64(s)
	if err != nil {
		log.Error("解析报错")
		return ""
	}
	data = uZipBytes(data)
	return string(data)
}

func setBase64(data []byte) string {
	a := base64.StdEncoding.EncodeToString(data)
	return a
}

func getBase64(data string) ([]byte, error) {
	a, err := base64.StdEncoding.DecodeString(data)

	return a, err
}

func zipBytes(data []byte) []byte {
	var in bytes.Buffer
	z := zlib.NewWriter(&in)
	z.Write(data)
	z.Close()
	return in.Bytes()
}

//zip解压
func uZipBytes(data []byte) []byte {
	var out bytes.Buffer
	var in bytes.Buffer
	in.Write(data)
	r, _ := zlib.NewReader(&in)
	r.Close()
	io.Copy(&out, r)
	return out.Bytes()
}

2.2 更新具体步骤#

上面的代码是对单条数据的压缩,现在需要从数据库查出数据,然后批量的压缩,采用更新的操作,需要考虑如下问题:

① 每一批取出多少条数据

② 批量压缩采用goroutine并发压缩

③ 批量更新如何操作

2.2.1 步骤一查询#

由于数据超过3亿条,因此要保证查询效率,不然查询速度会非常慢。具体做法:

  • 每次查询1万条数据
  • 查询的时候只查询需要的字段,即id字段和需要压缩的字段,id字段为主键,采用主键索引
  • 采用分页查询的方式,即每次查询完记录最后一条数据的id,下一次查询直接在这个id的基础上查询,语句如:select id, detail from log_table where id > last_id limit 10000;这里假设要压缩的字段名为detail,表名为log_table,这种方式不仅命中了索引,还避免了全表扫描
2.2.2 步骤二压缩#

上面查出了1万条数据,接着要做的就是批量压缩,如果采用for循环1个1个的压缩,那么效率必然不是最高的,可以利用go语言并发的优势,把1万条数据分成10组,每组1千条数据,让这10组数据同时进行压缩,代码如下:

代码语言:javascript
复制
func() {

    ......

    var wg sync.WaitGroup
    wg.Add(10)
    flagMap := make(map[int64]int, 0)
    for i := 0; i < 10; i++ {
        go func(ii int) {
            defer wg.Done()

            defer func() { // 防止发生panic之后暂停
                err := recover()
                if err != nil {
                    log.Info("err:", err)
                }
            }()

            rwLock.Lock()
            for j := ii * 1000; j < (ii+1)*1000; j++ { // 给goroutine划分执行区间
                flag := false
                detailTemp := processLogData[j].Detail
                detailTemp, flag = util.CompressForUpdate(detailTemp, true)
                if flag { // 如果不是有效json,则跳过
                    log.Info("该条json压缩失败或为空,已跳过,id:", processLogData[j].Id)
                    flagMap[processLogData[j].Id] = 2
                } else {
                    processLogData[j].Detail = detailTemp
                }
            }
            rwLock.Unlock()
        }(i)
    }
    wg.Wait() // 等待所有goroutine执行完毕

    ......

}

func CompressForUpdate(s string, jsonFlag bool) (string, bool) {
	// 开启 了 json 判断 并且 传入的 值 并非 是 有效的 json,直接返回原来的值
	if jsonFlag && !json.Valid([]byte(s)) || len(s) == 0 {
		return s, true
	}
	data := []byte(s)
	data = zipBytes(data)
	return setBase64(data), false
}

上面需要注意在每组goroutine内需要采用读写锁锁住,防止并发安全问题,因为里面有一个用来判空的map,是否需要判空根据不同业务决定

2.2.3 步骤三更新#

如果一条一条更新速度是极慢的,所以不推荐这种方法,这里采用的是批量更新的方式,经过试验,更新数据库字段,一次更新1000条,更新十次,会比一次更新1万条速度快很多,所以下面函数的tempList切片放的数据量是1千条,需要循环该函数10次才是1万条

代码语言:javascript
复制
func batchUpdate(tableName, fieldName string, tempList []models.LogTable) string {
	/*
	例子:
		UPDATE tableName
		    SET fieldName = CASE id
		        WHEN 1 THEN 'value'
		        WHEN 2 THEN 'value'
		        WHEN 3 THEN 'value'
		    END
		WHERE id IN (1,2,3);
	*/
	var sqlStr string
	sqlStr = "UPDATE " + tableName + " SET " + fieldName + " = CASE id"
	for i := 0; i < len(tempList); i++ {
		id := fmt.Sprintf("%d", tempList[i].Id)
		sqlStr += " WHEN " + id + " THEN " + tempList[i].Detail
		if i == len(tempList)-1 {
			sqlStr += " END"
		}
	}
	sqlStr += " WHERE id IN ("
	for i := 0; i < len(tempList); i++ {
		sqlStr += fmt.Sprintf("%d", tempList[i].Id)
		if i != len(tempList)-1 {
			sqlStr += ","
		} else {
			sqlStr += ");"
		}
	}

	return sqlStr
}

当脚本写好后需要把程序放到服务器上跑,且在内网的环境下进行。经过实验,查询+压缩+更新 1万条数据共花费4s左右时间,那么3亿条数据需要花费大概33小时

2.3 迁移具体步骤#

迁移主要包括查询和插入两个步骤,查询和上面的查询方法一样;经过比较,批量插入的时候每500条插入一次速度最快

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-10-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 背景#
  • 2. 解决方案#
    • 2.1 压缩代码#
      • 2.2 更新具体步骤#
        • 2.2.1 步骤一查询#
        • 2.2.2 步骤二压缩#
        • 2.2.3 步骤三更新#
      • 2.3 迁移具体步骤#
      相关产品与服务
      文件存储
      文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档