通常我们会使用 mysqldump
导出数据, 然后使用mysql命令导入. 我们可以根据 上一篇文章 提供的脚本来查看进度, 但是该等的时间还是不能少.
mysql导入是单线程的, 很慢. 那么我们可以把.sql
文件拆分为多个文件, 然后并发导入, 这样就快很多了.
其实之前也测试过的, 但是效果不佳, 开32并发速度都是差不多的..... 因为当时是把每个INSERT
语句都均匀的分在每个文件. 这样并不会提高导入速度.
吸取了上次的教训, 这次就按照 每张表一个文件
来拆分. 然后并发导入, 这样同时导入, 速度就会提示.
mysql 5.7 和 8.0 的mysqldump导出的数据是差不多的, 只有一点点区别
格式如下:
客户端和服务端 版本信息
字符集等变量设置
SQL_LOG_BIN
GLOBAL.GTID_PURGED 如果是8.0的话
CHANGE MASTER
db1
create table 建表
insert into 插入数据
trigger 触发器
v1 view 视图
db2 ....
db1
event
routin
db2
...
db1
view
db2 ....
GLOBAL.GTID_PURGED 如果是5.7的话
字符集等变量设置(改回去)
那我们就可以根据这个格式来写相关脚本了.
官方可能也考虑到了这一点, 还是提供了相关的关键字的.
关键字关系如下
GTID 8.0 (开头的)
--
-- GTID state at the beginning of the backup
--
GTID 5.7 (结尾的)
--
-- GTID state at the end of the backup
--
CHANGE MASTER
--
-- Position to start replication or point-in-time recovery from
--
建库语句
--
-- Current Database: `ibd2sql`
--
表结构
--
-- Table structure for table `AllTypesExample`
--
INSERT
--
-- Dumping data for table `AllTypesExample`
--
视图(和表放一起的) 8.0
--
-- Temporary view structure for view `v1`
--
视图(和表放一起的) 5.7
--
-- Temporary table structure for view `v_1`
--
EVENT
--
-- Dumping events for database 'db2'
--
ROUTINES
--
-- Dumping routines for database 'db2'
--
VIEW (最后的)
--
-- Final view structure for view `v_1`
--
并发导入的原理比较简单, 其实就是把进程放后台就行. 主要是注意导入顺序
如果是 5.7 导入到8.0 的话, 需要注意统计信息表是的DROP
和CREATE
是无法执行的, 可以人工注释掉,然后导入, 或者人工收集统计信息.
MysqlDumpSplitSQL.py
使用python2
编写的(python3有点编码问题). 可以将mysqldump导出的.sql文件拆分为多个文件, 按照如下结构分布:
如果是8.0的话, 还有dbs/special.sql
记录统计信息
splitByddcw_20240229_165143
├── dbs
│ ├── chartest
│ │ ├── appmap.sql
│ │ └── app.sql
│ ├── create.sql
│ ├── gtid.sql
│ ├── master_info.txt
│ ├── T20240227_2
│ └── test
│ └── test.sql
├── events
│ ├── chartest.sql
│ └── test.sql
├── routines
│ ├── chartest.sql
│ ├── db1.sql
└── views
├── db1.sql
└── t20240227.sql
速度还是很快的(嘎嘎快), 1.7GB
的文件只要4秒
左右就能拆分完.
python MysqlDumpSplitSQL.py t20240228_alldb.sql
如果只要某张表的话, 还可以使用 --table tablename
来匹配需要的表. 支持正则表达式
详细用法如下
python MysqlDumpSplitSQL.py -h
usage: MysqlDumpSplitSQL.py [-h] [--version] [--database DATABASE]
[--table TABLE] [--output-dir OUTPUT_DIR]
[--presql PRESQL] [--file FILENAME]
[--log-file LOG_FILENAME]
[files [files ...]]
拆分 mysqldump 导出的.sql文件.
positional arguments:
files 要拆分的 mysqldump.sql 文件
optional arguments:
-h, --help show this help message and exit
--version, -v, -V 版本信息
--database DATABASE 只导入的数据库
--table TABLE 只导入的表
--output-dir OUTPUT_DIR
输出的目录
--presql PRESQL 每个.sql文件开头部分, 比如 set
sql_log_bin=off; set names utf8mb4
--file FILENAME 要拆分的 mysqldump.sql 文件
--log-file LOG_FILENAME
日志
testparallel.sh
按照mysqldump导出的顺序做导入操作, 还额外检查了下 某些特殊参数, 然后开了并发.差不多就这些. 并发逻辑就是 放后台, 然后循环检查 如果跑完了, 就下一个导入开始. 由于是基于文件级别的, 所以存在短板效应.
修改脚本中的连接信息
和 并发度
等信息, 然后执行脚本,后面跟上上面拆分的路径就行. 剩下的脚本自己去识别.
sh testparallel.sh splitByddcw_20240229_165143
修改的信息参考如下:
#可以修改的参数
CONCURRENCY=4 #并发数量
SLEEP_INTERNAL="0.01" #每隔 SLEEP_INTERNAL 秒, 检查一次 是否有导入完成的进程
IGNORE_GTID_CHECK="0" #如果为1, 表示不检查GTID是否存在
IGNORE_FUCNTION_CREATOR="0" #如果为1, 表示不检查log_bin_trust_function_creators是否为1
IGNORE_DISABLE_ENGINE="0" #如果为1, 表示不检查disabled_storage_engines是否含MyISAM
LOGFILE="import.log" #导入日志. 和控制台输出的内容一样
DIRNAME=$1 #已经拆分了的 mysqldump 导出的SQL文件目录
说这么多, 还不如直接演示一下, 由于打印的日志太长了, 这里就把中间的信息省略了.
为了验证结果正确性, 我们要在导入前后使用如下命令来记录相关表的校验值, 然后验证我们的脚本确实没毛病!
校验命令参考如下:
导入前数据校验:
mysql -h127.0.0.1 -P3314 -p123456 -NB -e "select concat('CHECKSUM TABLE \`',TABLE_SCHEMA,'\`.\`',TABLE_NAME,'\`;') FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA NOT IN('sys','mysql','information_schema','performance_schema');" | sort | mysql -h127.0.0.1 -P3314 -p123456 > /tmp/before_check.txt
导入后数据校验:
mysql -h127.0.0.1 -P3314 -p123456 -NB -e "select concat('CHECKSUM TABLE \`',TABLE_SCHEMA,'\`.\`',TABLE_NAME,'\`;') FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA NOT IN('sys','mysql','information_schema','performance_schema');" | sort | mysql -h127.0.0.1 -P3314 -p123456 > /tmp/after_check.txt
前后数据比较
diff /tmp/before_check.txt /tmp/after_check.txt
导出没啥好说的, 直接全库导出即可
17:05:28 [root@ddcw21 mysqldump_t20240226]#mysqldump -h127.0.0.1 -P3314 -p123456 --events --triggers --single-transaction --routines --master-data=2 -A > t20240229_alldb.sql
mysqldump: [Warning] Using a password on the command line interface can be insecure.
17:06:48 [root@ddcw21 mysqldump_t20240226]#
17:06:48 [root@ddcw21 mysqldump_t20240226]#ll -ahrlt t20240229_alldb.sql
-rw-r--r-- 1 root root 1.7G Feb 29 17:06 t20240229_alldb.sql
直接使用脚本拆即可
17:07:55 [root@ddcw21 mysqldump_t20240226]#python MysqlDumpSplitSQL.py t20240229_alldb.sql
2024-02-29 17:08:03 CLIENT_VERSION: 8.0.28, SERVER_VERSION: 8.0.28FILE_HEADER:
-- AUTO SPLIT MYSQLDUMP FILE BY DDCW @https://github.com/ddcw
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!50503 SET NAMES utf8mb4 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!50606 SET @OLD_INNODB_STATS_AUTO_RECALC=@@INNODB_STATS_AUTO_RECALC */;
/*!50606 SET GLOBAL INNODB_STATS_AUTO_RECALC=OFF */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
SET @MYSQLDUMP_TEMP_LOG_BIN = @@SESSION.SQL_LOG_BIN;
SET @@SESSION.SQL_LOG_BIN= 0;
2024-02-29 17:08:03 READ TABLE FOR mysql.columns_priv BEGIN
2024-02-29 17:08:03 READ TABLE FOR mysql.columns_priv FINISH. COST TIME: 0.0 seconds
2024-02-29 17:08:03 READ TABLE FOR mysql.component BEGIN
2024-02-29 17:08:03 READ TABLE FOR mysql.component FINISH. COST TIME: 0.0 seconds
2024-02-29 17:08:03 READ TABLE FOR mysql.db BEGIN
2024-02-29 17:08:03 READ TABLE FOR mysql.db FINISH. COST TIME: 0.0 seconds
........................................
日志太多就省略中间部分了
........................................
2024-02-29 17:08:07 READ ROUTINE FOR DATABASE t20240227_2 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ TABLE FOR test.test BEGIN
2024-02-29 17:08:07 READ TABLE FOR test.test FINISH. COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ EVENT FOR DATABASE test BEGIN
2024-02-29 17:08:07 READ EVENT FOR DATABASE test FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ ROUTINE FOR DATABASE test BEGIN
2024-02-29 17:08:07 READ ROUTINE FOR DATABASE test FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ VIEW FOR DATABASE db1 BEGIN
2024-02-29 17:08:07 READ VIEW FOR DATABASE db1 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227 BEGIN
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227_2 BEGIN
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227_2 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ ALL FINISH
2024-02-29 17:08:07 FILENAME : /root/mysqldump_t20240226/t20240229_alldb.sql
2024-02-29 17:08:07 OUTPUT_DIR : /root/mysqldump_t20240226/splitByddcw_20240229_170803
2024-02-29 17:08:07 LOG_FILENAME : /root/mysqldump_t20240226/SplitMysqlDumpSQL.log
2024-02-29 17:08:07 COST TIME : 4.02 SECONDS. TABLES COUNT: 412
2024-02-29 17:08:07 WARNING : 0
17:08:07 [root@ddcw21 mysqldump_t20240226]#
就 4 秒 -_-
直接导入的话, 由于GTID问题, 会导不进去
17:09:54 [root@ddcw21 mysqldump_t20240226]#sh testparallel.sh splitByddcw_20240229_170803
********** BEGIN CHECK **************
2024-02-29 17:10:03 CONNCT SUCCESS.
2024-02-29 17:10:03 CHECK_CONN OK
2024-02-29 17:10:03 CURRENT GTID: b68e2434-cd30-11ec-b536-000c2980c11e:1-35 FAILED!
17:10:03 [root@ddcw21 mysqldump_t20240226]#
所以我们要修改脚本, 把忽略GTID给打开, 把IGNORE_FUCNTION_CREATOR也打开吧.
IGNORE_GTID_CHECK="1"
IGNORE_FUCNTION_CREATOR="1"
然后再次导入
17:11:38 [root@ddcw21 mysqldump_t20240226]#sh testparallel.sh splitByddcw_20240229_170803
********** BEGIN CHECK **************
2024-02-29 17:11:39 CONNCT SUCCESS.
2024-02-29 17:11:39 CHECK_CONN OK
2024-02-29 17:11:39 CHECK_GTID OK
2024-02-29 17:11:39 MYSQL VERSION: 8 0 28
2024-02-29 17:11:39 CHECK_VERSION OK
2024-02-29 17:11:39 disabled_storage_engines= OK
2024-02-29 17:11:39 log_bin_trust_function_creators=1 OK
........................................
日志太多就省略中间部分了
........................................
#################### IMPORT APP VIEWS #####################
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/db1.sql BEGIN...
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/db1.sql SUCCESS.
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/db1.sql FINISH. cost 0 seconds
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227.sql BEGIN...
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227.sql SUCCESS.
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227.sql FINISH. cost 0 seconds
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227_2.sql BEGIN...
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227_2.sql SUCCESS.
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227_2.sql FINISH. cost 0 seconds
2024-02-29 17:15:16 APP DATABASE COUNT: 24 APP TABLE COUNT: 377 APP DATA IMPORT COST_TIME: 215 SECONDS.
2024-02-29 17:15:16 IMPORT ALL FINISH. TOTAL COST TIME 217 SECONDS. FAILED COUNT: 1
2024-02-29 17:15:16 ERROR COUNT: 1
2024-02-29 17:15:16
splitByddcw_20240229_170803/dbs/gtid.sql IMPORT FAILED
耗时 217秒
, 还行... 主要是那种大表太慢了(170秒)....
这里有个ERROR, 是gtid.sql导入失败了. 我们不用管它. 因为是我们自己忽略的. 当然也可以选择reset master
然后再导入这个gtid.sql
也是可以的
直接导入会报错, 有gtid问题, 所以要先reset master
17:19:53 [root@ddcw21 mysqldump_t20240226]#mysql -h127.0.0.1 -P3314 -p123456 < t20240229_alldb.sql
mysql: [Warning] Using a password on the command line interface can be insecure.
ERROR 3546 (HY000) at line 26: @@GLOBAL.GTID_PURGED cannot be changed: the added gtid set must not overlap with @@GLOBAL.GTID_EXECUTED
reset master后, 开始导入 (加个time, 不然看不到时间, 不好比较)
17:21:22 [root@ddcw21 mysqldump_t20240226]#time mysql -h127.0.0.1 -P3314 -p123456 < t20240229_alldb.sql
mysql: [Warning] Using a password on the command line interface can be insecure.
real 4m39.067s
user 0m11.893s
sys 0m4.000s
17:26:02 [root@ddcw21 mysqldump_t20240226]#
感觉上就比较慢了.
我这里忘了校验前后数据一致性了.....
之前做测试的时候 校验过的, 是一致的.
拆分时间4秒 加上 导入217秒, 耗时3min37s
导入类型 | 时间 |
---|---|
原生导入 | 4min39s |
4并发 | 3min37s |
8并发 | 3min12s |
效果还是有的, 但是有短板效应.
DROP
和CREATE
的, 但 8.0 只有 INSERT IGNORE INTO
github地址: https://github.com/ddcw/ddcw/tree/master/python/MySQL%E5%B9%B6%E5%8F%91%E5%AF%BC%E5%85%A5
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# write by ddcw @https://github.com/ddcw
# 拆分 mysqldump 导出的.sql文件 (暂不支持过滤)
# 本脚本没有恢复 会话变量, 所以不建议使用 source
import sys,os
import re
import datetime,time
import errno
import argparse,glob
# 解析参数
def _argparse():
parser = argparse.ArgumentParser(add_help=True, description='拆分 mysqldump 导出的.sql文件.')
parser.add_argument('--version', '-v', '-V', action='store_true', dest="VERSION", default=False, help='版本信息')
parser.add_argument('--database', dest="DATABASE", default="*", help='只导入的数据库')
parser.add_argument('--table', dest="TABLE", default="*", help='只导入的表')
parser.add_argument('--output-dir', dest="OUTPUT_DIR", default="", help='输出的目录')
#parser.add_argument('--parallel', dest="PARALLEL", default=4, help='导入并发度')
parser.add_argument('--presql', dest="PRESQL", default="", help='每个.sql文件开头部分, 比如 set sql_log_bin=off; set names utf8mb4')
#parser.add_argument('--postsql', dest="POSTSQL", help='每个.sql文件结尾部分, 比如 set sql_log_bin=on;')
#parser.add_argument('--mysqlbin', dest="MYSQLBIN", default="mysql -c ", help='导入时, 使用的mysql命令')
parser.add_argument('--file', dest="FILENAME", default="", help='要拆分的 mysqldump.sql 文件')
parser.add_argument('--log-file', dest="LOG_FILENAME", default="", help='日志')
parser.add_argument("files", nargs="*", help="要拆分的 mysqldump.sql 文件")
if parser.parse_args().VERSION:
print('VERSION: v0.1')
sys.exit(0)
return parser.parse_args()
#匹配表是否符合要求的
def match_table(dbname,tablename):
dtname = str(dbname) + "." + tablename
return True if re.search(PATTERN,dtname) else False
def log(*args):
msg = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + " " + " ".join([ str(x) for x in args ]) + "\n"
print(msg[:-1])
LOG_FD.write(msg)
def read_header(f):
client_version = "unknown"
server_version = "unknown"
gtid = "" # 兼容5.7
master_info = ""
header = ""
while True:
old_offset = f.tell()
data = f.readline()
if data[:17] == "-- Server version":
server_version = data.split()[-1:][0]
elif data[:14] == "-- MySQL dump ":
client_version = data.split()[5]
elif data[:21] == "-- GTID state at the ":
tdata = ""
f.seek(old_offset,0)
while True:
_data = f.readline()
tdata += _data
if _data[-2:] == ";\n":
break
gtid = tdata
elif data[:21] == "-- Position to start ":
tdata = ""
f.seek(old_offset,0)
while True:
_data = f.readline()
tdata += _data
if _data[-2:] == ";\n":
break
master_info = tdata
elif data[:3] == "/*!" or data[:4] == "SET ":
header += data
elif data[:20] == "-- Current Database:":
f.seek(old_offset,0)
break
return header,client_version,server_version,gtid,master_info
def read_view(f,header,db):
view_name = "views" + "/" + str(db) + ".sql"
with open(view_name, 'w') as fd:
fd.write(header)
fd.write("USE `"+db+"`;\n")
while True:
old_offset = f.tell()
data = f.readline()
if data[:3] == "-- " and data[:4] != "-- F":
f.seek(old_offset,0)
break
fd.write(data)
def read_routine(f,header,db):
routine_filename = "routines"+"/"+str(db)+".sql"
with open(routine_filename, 'w') as fd:
fd.write(header)
data = f.readline()
data += f.readline()
fd.write("USE `"+db+"`;\n")
fd.write(data)
while True:
old_offset = f.tell()
data = f.readline()
if data[:3] == "--\n":
#f.seek(old_offset,0)
break
fd.write(data)
def read_event(f,header,db):
event_filename = "events/"+db+".sql"
with open(event_filename, 'w') as fd:
fd.write(header)
data = f.readline()
data += f.readline()
fd.write("USE `"+db+"`;\n")
fd.write(data)
while True:
data = f.readline()
fd.write(data)
if data == "DELIMITER ;\n" or data[:3] == "--\n":
break
#data = f.readline()
class NULLPASS(object):
def __init__(self,*args):
pass
def write(self,*args):
pass
def read_table(f,header,db):
data = f.readline()
table_name = re.compile("`(.+)`").findall(data)[0]
data += "--\n" + f.readline()
filename = "dbs/" + db + "/" + table_name + ".sql"
with open(filename,'w') as fd:
if not match_table(db,table_name):
#log(db+"."+table_name+" NOT MATCH "+PATTERN+" SKIP IT!")
fd = NULLPASS()
fd.write(header)
fd.write(data)
fd.write("USE `"+db+"`;")
#读表结构
while True:
data = f.readline()
fd.write(data)
if data == "--\n":
break
#读数据
old_offset = f.tell()
data = f.readline()
if data[:26] == "-- Dumping data for table ":
data += f.readline() + f.readline()
fd.write(data)
while True:
data = f.readline()
fd.write(data)
if data == "--\n": #可能有触发器, 所以不能以UNLOCK TABLES;结束
break
else:
f.seek(old_offset,0)
return True
# 建目录
def mkdir_exists(dirname):
try:
os.makedirs(dirname)
except OSError as e:
if e.errno != errno.EEXIST:
raise
if __name__ == '__main__':
START_TIME = time.time()
parser = _argparse()
FILTER_DATABASE = parser.DATABASE
FILTER_TABLE = parser.TABLE
PATTERN = str(FILTER_DATABASE).replace("*",".*") + "\." + str(FILTER_TABLE).replace("*",".*")
filelist = []
for pattern in parser.files:
filelist += glob.glob(pattern)
fileset = filelist
FILENAME = parser.FILENAME if parser.FILENAME != "" and os.path.exists(parser.FILENAME) else ""
FILENAME = fileset[0] if len(fileset) >= 1 and FILENAME == "" and os.path.exists(fileset[0]) else ""
if FILENAME == "":
print('At least one binlog file')
sys.exit(1)
FILENAME = os.path.abspath(FILENAME) # 设置为绝对路径, 因为后面要切换rootdir
LOG_FILENAME = "SplitMysqlDumpSQL.log" if parser.LOG_FILENAME == "" else parser.LOG_FILENAME
LOG_FILENAME = os.path.abspath(LOG_FILENAME)
OUTPUT_DIR = parser.OUTPUT_DIR if parser.OUTPUT_DIR != "" else "splitByddcw_" + str(datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))
OUTPUT_DIR = os.path.abspath(OUTPUT_DIR)
#print("READ FILENAME: "+FILENAME+" OUTPUT_DIR: "+OUTPUT_DIR)
mkdir_exists(OUTPUT_DIR)
# 不检查空间是否足够了(lazy), 要自行检查(要求1.1倍.sql文件大小)
# 创建相关目录
os.chdir(OUTPUT_DIR) # 切换工作目录, 懒得去拼接目录了....
mkdir_exists("dbs") #库表信息
mkdir_exists("events") #event
#mkdir_exists("triggers") #触发器
mkdir_exists("routines") #存储过程和函数
mkdir_exists("views") #单独的view(Final view structure) 不含Temporary table structure for view
f = open(FILENAME,'r')
LOG_FD = open(LOG_FILENAME,'a')
CREATE_DB_FD = open('dbs/create.sql','w') #建库语句
FILE_HEADER,client_version,server_version,gtid_info,master_info = read_header(f)
CREATE_DB_FD.write(FILE_HEADER)
CREATE_DB_FD.write("\n")
FILE_HEADER = "-- AUTO SPLIT MYSQLDUMP FILE BY DDCW @https://github.com/ddcw\n" + FILE_HEADER + "\n" + parser.PRESQL + "\n"
_msg = "CLIENT_VERSION: " + client_version + " SERVER_VERSION: " + server_version + "FILE_HEADER:\n" + FILE_HEADER
log(_msg)
# 写change master
if master_info != "":
_master_file = "dbs/master_info.txt"
with open(_master_file, 'w') as _mf:
_mf.write(master_info)
# 读库表信息
CURRENT_DB = ""
TABLE_COUNT = 0
WARNING_COUNT = 0
while True:
old_offset = f.tell()
data = f.readline()
if data == "": #读完了.
break
elif data[:20] == "-- Current Database:":
CURRENT_DB = re.compile("`(.+)`").findall(data)[0]
_dirname = "dbs/" + str(CURRENT_DB)
mkdir_exists(_dirname)
elif data[:16] == "CREATE DATABASE ":
CREATE_DB_FD.write(data)
#CREATE TABLE, INSERT, CREATE TRIGGER, CREATE VIEW
elif data[:28] == "-- Table structure for table" or data[:36] == "-- Temporary view structure for view" or data[:37] == "-- Temporary table structure for view":
TABLE_COUNT += 1
f.seek(old_offset,0)
table_name = re.compile("`(.+)`").findall(data)[0]
log("READ TABLE FOR "+CURRENT_DB+"."+table_name,'BEGIN')
_st = time.time()
read_table(f,FILE_HEADER,CURRENT_DB)
_et = time.time()
log("READ TABLE FOR "+CURRENT_DB+"."+table_name,"FINISH.","COST TIME: "+ str(round((_et-_st),2)) +" seconds")
#READ EVENT
elif data[:31] == "-- Dumping events for database ":
CURRENT_DB = re.compile("'(.+)'").findall(data)[0]
f.seek(old_offset,0)
log("READ EVENT FOR DATABASE "+CURRENT_DB+" BEGIN")
_st = time.time()
read_event(f,FILE_HEADER,CURRENT_DB)
_et = time.time()
log("READ EVENT FOR DATABASE "+CURRENT_DB+" FINISH","COST TIME: "+ str(round((_et-_st),2)) +" seconds")
#READ ROUTINE
elif data[:33] == "-- Dumping routines for database ":
CURRENT_DB = re.compile("'(.+)'").findall(data)[0]
f.seek(old_offset,0)
log("READ ROUTINE FOR DATABASE "+CURRENT_DB+" BEGIN")
_st = time.time()
read_routine(f,FILE_HEADER,CURRENT_DB)
_et = time.time()
log("READ ROUTINE FOR DATABASE "+CURRENT_DB+" FINISH","COST TIME: "+ str(round((_et-_st),2)) +" seconds")
#READ VIEW
elif data[:33] == "-- Final view structure for view ":
f.seek(old_offset,0)
log("READ VIEW FOR DATABASE "+CURRENT_DB+" BEGIN")
_st = time.time()
read_view(f,FILE_HEADER,CURRENT_DB)
_et = time.time()
log("READ VIEW FOR DATABASE "+CURRENT_DB+" FINISH","COST TIME: "+ str(round((_et-_st),2)) +" seconds")
elif data[:21] == "-- GTID state at the ":
tdata = ""
f.seek(old_offset,0)
log("READ GTID PURGED")
while True:
_data = f.readline()
tdata += _data
if _data[-2:] == ";\n":
break
gtid_info += tdata
elif data[:26] == "-- Dumping data for table ": #系统库, 主要是统计信息
# ONLY FOR MYSQL 8.x
_filename = "dbs/special.sql"
with open(_filename,'a') as fd:
fd.write(FILE_HEADER)
fd.write("\n")
fd.write(data)
fd.write("USE `"+CURRENT_DB+"`;\n")
while True:
_old_offset = f.tell()
data = f.readline()
if data[:3] == "-- ":
f.seek(_old_offset,0)
break
fd.write(data)
elif data[:20] == "-- Dump completed on":
log("READ ALL FINISH")
break
elif data[:4] == "USE " or data == "\n" or data == "--\n":
pass
elif data[:3] == "/*!" or data[:4] == "SET ": #跳过结尾的注释和一些set
pass
else:
WARNING_COUNT += 1
log("========== SKIP ==========\n"+data)
# 写 gtid信息, 5.7 在结尾, 所以后写
if gtid_info != "":
_gtid_file = "dbs/gtid.sql"
with open(_gtid_file, 'w') as _mf:
_mf.write(FILE_HEADER)
_mf.write("\n")
_mf.write(gtid_info)
STOP_TIME = time.time()
log("FILENAME :",FILENAME)
log("OUTPUT_DIR :",OUTPUT_DIR)
log("LOG_FILENAME :",LOG_FILENAME)
log("COST TIME :",str(round((STOP_TIME - START_TIME),2)), "SECONDS. TABLES COUNT:",TABLE_COUNT)
log("WARNING :",WARNING_COUNT)
f.close()
LOG_FD.close()
CREATE_DB_FD.close()
#!/usr/bin/env bash
#write by ddcw @https://github.com/ddcw
#可以修改的参数
CONCURRENCY=4 #并发数量
SLEEP_INTERNAL="0.01" #每隔 SLEEP_INTERNAL 秒, 检查一次 是否有导入完成的进程
IGNORE_GTID_CHECK="1" #如果为1, 表示不检查GTID是否存在
IGNORE_FUCNTION_CREATOR="1" #如果为1, 表示不检查log_bin_trust_function_creators是否为1
IGNORE_DISABLE_ENGINE="0" #如果为1, 表示不检查disabled_storage_engines是否含MyISAM
LOGFILE="import.log" #导入日志. 和控制台输出的内容一样
DIRNAME=$1 #已经拆分了的 mysqldump 导出的SQL文件目录
#MYSQL连接信息
MYSQL_COM="mysql -h127.0.0.1 -p123456 -P3314 -uroot "
touch ${LOGFILE}
#不可修改参数
MYSQL_VERSION=() #MYSQL SERVER版本信息
export LANG="en_US.UTF-8" #设置LANG
POSTMSG="" #结尾时打印的信息, 先保存起来
APP_TABLE_TIME_COUNT="0" #导入业务表的时间
DB_COUNT=0 #导入的库计数, 不含系统库
FILES_COUNT=0 #导入的文件计数. 只考虑业务表
FAIL_COUNT_1=`grep ' FAILED$' ${LOGFILE} | wc -l` #记录之前日志的报错信息
ERROR_COUNT=0 #error计数
exit1(){
echo "`date "+%Y-%m-%d %H:%M:%S"` ${@}"
exit 1
}
[ "${DIRNAME}" == "" ] && exit1 "sh $0 splitByddcw_XXXXXX"
[ -d ${DIRNAME} ] || exit1 "${DIRNAME} is not exists"
DIRNAME=${DIRNAME%/} #格式化目录变量, 去掉结尾的/ 方便目录拼接
log(){
_msg="$(date '+%Y-%m-%d %H:%M:%S') $@"
echo -e ${_msg} # | tee -a ${LOGFILE}
echo -e ${_msg} >> ${LOGFILE}
}
import_sql(){
file=$1
ts=`date +%s`
log "IMPORT ${file} BEGIN..."
#${MYSQL_COM} < $file >>${LOGFILE} 2>&1 && log "IMPORT ${file} SUCCESS." || log "IMPORT ${file} FAILED"
if ${MYSQL_COM} < $file >>${LOGFILE} 2>&1;then
log "IMPORT ${file} SUCCESS."
else
log "IMPORT ${file} FAILED"
((ERROR_COUNT++))
return 1
fi
te=`date +%s`
log "IMPORT ${file} FINISH. cost $[ ${te} - ${ts} ] seconds"
return 0
}
CHECK_CONN(){
if ${MYSQL_COM} -e "select 1+1" >>${LOGFILE} 2>&1; then
log "CONNCT SUCCESS."
else
exit1 "CONNCT FAILD. Please read log (${LOGFILE}) FAILED!"
fi
log "CHECK_CONN OK"
}
CHECK_GTID(){
current_gtid=`${MYSQL_COM} -NB -e "select @@GLOBAL.GTID_EXECUTED" 2>>${LOGFILE}`
if [ "${current_gtid}" != "" ] && [ "${IGNORE_GTID_CHECK}" != "1" ];then
exit1 "CURRENT GTID: ${current_gtid} FAILED!"
fi
log "CHECK_GTID OK"
}
CHECK_VERSION(){
IFS='.' read -r -a MYSQL_VERSION <<< `${MYSQL_COM} -NB -e "select @@version" 2>>${LOGFILE}`
if [ "${MYSQL_VERSION[0]}" == "5" ] || [ "${MYSQL_VERSION[0]}" == "8" ] ;then
log "MYSQL VERSION: ${MYSQL_VERSION[@]}"
else
exit1 "ONLY FOR MYSQL 5/8, CURRENT MYSQL VERSION:${MYSQL_VERSION[@]} FAILED!"
fi
log "CHECK_VERSION OK"
}
CHECK_VARIABELS(){
disabled_storage_engines=`${MYSQL_COM} -NB -e "select @@disabled_storage_engines" 2>>${LOGFILE}`
if echo "${disabled_storage_engines}" | grep -i "MyISAM" >/dev/null;then
if [ "${IGNORE_DISABLE_ENGINE}" != "1" ];then
exit1 "disabled_storage_engines=${disabled_storage_engines} FAILED"
fi
else
log "disabled_storage_engines=${disabled_storage_engines} OK"
fi
log_bin_trust_function_creators=`${MYSQL_COM} -NB -e "select @@log_bin_trust_function_creators" 2>>${LOGFILE}`
if [ "${log_bin_trust_function_creators}" == "0" ] && [ "${IGNORE_FUCNTION_CREATOR}" != "1" ];then
exit1 "log_bin_trust_function_creators=${log_bin_trust_function_creators} FAILED"
else
log "log_bin_trust_function_creators=${log_bin_trust_function_creators} OK"
fi
log "CHECK_VARIABELS OK"
}
IMPORT_CHANGE_MASTER(){
echo ""
}
IMPORT_GTID(){
if [ -f ${DIRNAME}/dbs/gtid.sql ];then
log "\n\n#################### IMPORT GTID #####################"
else
log "SKIP IMPORT GTID"
return 1
fi
import_sql ${DIRNAME}/dbs/gtid.sql || POSTMSG="${POSTMSG}\n ${DIRNAME}/dbs/gtid.sql IMPORT FAILED"
}
IMPORT_DATABASE_DDL(){
if [ -f ${DIRNAME}/dbs/create.sql ];then
log "\n\n#################### IMPORT CREATE DATABASE DDL #####################"
else
log "SKIP IMPORT DATABASE DDL"
return 1
fi
import_sql ${DIRNAME}/dbs/create.sql || POSTMSG="${POSTMSG}\n ${DIRNAME}/dbs/create.sql IMPORT FAILED"
}
IMPORT_MYSQL_DATABASE(){
if [ -d ${DIRNAME}/dbs/mysql ];then
log "\n\n#################### IMPORT MYSQL DB #####################"
else
log "SKIP IMPORT mysql DATABASE"
return 1
fi
for filename in ${DIRNAME}/dbs/mysql/*.sql; do
import_sql ${filename} || POSTMSG="${POSTMSG}\n ${filename} IMPORT FAILED"
done
}
IMPORT_MYSQL_STATICS(){
if [ -f ${DIRNAME}/dbs/special.sql ];then
log "\n\n#################### IMPORT MYSQL DB STATICS (ONLY FOR mysql 8.x) #####################"
else
log "SKIP IMPORT mysql statics. MYSQL_VERSION: ${MYSQL_VERSION[@]}"
return 1
fi
import_sql ${DIRNAME}/dbs/special.sql || POSTMSG="${POSTMSG}\n ${DIRNAME}/dbs/special.sql IMPORT FAILED"
}
#并发导入业务表
IMPORT_APP_TABLE(){
log "\n\n#################### IMPORT APP TABLE&DATA #####################"
T_START=`date +%s`
PIDS=()
for dirname in `find ${DIRNAME}/dbs -type d`;do
postname=`echo ${dirname} | awk -F '/' '{print $NF}'`
if [ "${postname}" == "mysql" ] || [ "${postname}" == "" ] || [ "${DIRNAME}/dbs" == "${dirname}" ];then
continue #跳过mysql库
fi
log "IMPORT DATABSE FOR ${postname} BEGIN"
#PIDS=()
db_start_time=`date +%s`
for filename in `find ${dirname} -name '*.sql'`;do
while [ ${#PIDS[@]} -ge ${CONCURRENCY} ];do
sleep ${SLEEP_INTERNAL}
for i in "${!PIDS[@]}";do
if ! kill -0 "${PIDS[$i]}" 2>/dev/null;then
unset 'PIDS[i]' #这个导入进程跑完了. 就从数组里面移除
fi
done
PIDS=("${PIDS[@]}") #重新初始化一下PIDS
done
import_sql "${filename}" & #放后台导入, 也就是开并发
PIDS+=($!)
((FILES_COUNT++))
done
#wait #等待这个库的所有表导完. 不然进程数可能超过设置的大小. 也就是去掉之后性能能提升一部分
db_stop_time=`date +%s`
((DB_COUNT++))
log "IMPORT DATABSE FOR ${postname} FINISH. COST_TIME: $[ ${db_stop_time} - ${db_start_time} ] SECONDS."
done
wait #等待所有后台进程跑完
T_STOP=`date +%s`
APP_TABLE_TIME_COUNT="$[ ${T_STOP} - ${T_START} ]"
}
IMPORT_APP_EVENT(){
log "\n\n#################### IMPORT APP EVENTS #####################"
for filename in `find ${DIRNAME}/events -name '*.sql'`;do
import_sql ${filename}
done
}
IMPORT_APP_ROUTINE(){
log "\n\n#################### IMPORT APP ROUTINES #####################"
for filename in `find ${DIRNAME}/routines -name '*.sql'`;do
import_sql ${filename}
done
}
IMPORT_APP_VIEW(){
log "\n\n#################### IMPORT APP VIEWS #####################"
for filename in `find ${DIRNAME}/views -name '*.sql'`;do
import_sql ${filename}
done
}
_START_DT=`date +%s`
echo -e "\n\n********** BEGIN CHECK **************"
#检查能否连接
CHECK_CONN
#检查GTID是否存在
CHECK_GTID
#检查版本
CHECK_VERSION
#检查变量disabled_storage_engines log_bin_trust_function_creators
CHECK_VARIABELS
echo "********** CHECK FINISH **************\n\n"
#数据导入
echo "\n\n********** BEGIN IMPORT DATA **************"
#导入change master语句. 默认注释, 需要人工启用
IMPORT_CHANGE_MASTER
#导入GTID(8.0.x)
if [ "${MYSQL_VERSION[0]}" == "8" ];then
IMPORT_GTID
fi
#导入数据库DDL
IMPORT_DATABASE_DDL
#导入系统库表
IMPORT_MYSQL_DATABASE
#导入统计信息
IMPORT_MYSQL_STATICS
#业务表(并发)(可能含触发器)
IMPORT_APP_TABLE
#导入EVENT
IMPORT_APP_EVENT
#业务存储过程和函数
IMPORT_APP_ROUTINE
#业务视图
IMPORT_APP_VIEW
#导入GTID(5.7.x)
if [ "${MYSQL_VERSION[0]}" == "5" ];then
IMPORT_GTID
fi
_STOP_DT=`date +%s`
FAIL_COUNT_2=`grep ' FAILED$' ${LOGFILE} | wc -l`
log "APP DATABASE COUNT: ${DB_COUNT} APP TABLE COUNT: ${FILES_COUNT} APP DATA IMPORT COST_TIME: ${APP_TABLE_TIME_COUNT} SECONDS."
log "IMPORT ALL FINISH. TOTAL COST TIME $[ ${_STOP_DT} - ${_START_DT} ] SECONDS. FAILED COUNT: $[ ${FAIL_COUNT_2} - ${FAIL_COUNT_1} ]"
log "ERROR COUNT: ${ERROR_COUNT}"
if [ "${POSTMSG}" != "" ];then
log "${POSTMSG}"
fi
#统计信息导入失败
if echo "${POST}" | grep -E "innodb_index_stats.sql|innodb_table_stats.sql" >/dev/null 2>&1 ;then
log "统计信息导入失败, 原因可能为 5.7 --> 8.0 建议如下:\n\t 1. 手动 导入统计信息表(删除DROP TABLE和CREATE TABLE之后在导入)\n\t 2. 手动使用 ANALYZE TABLE 去收集统计信息"
fi
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。