前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[MYSQL] mysql怎么并发导入数据?

[MYSQL] mysql怎么并发导入数据?

原创
作者头像
大大刺猬
发布2024-02-29 17:49:58
3140
发布2024-02-29 17:49:58

导读

通常我们会使用 mysqldump 导出数据, 然后使用mysql命令导入. 我们可以根据 上一篇文章 提供的脚本来查看进度, 但是该等的时间还是不能少.

mysql导入是单线程的, 很慢. 那么我们可以把.sql文件拆分为多个文件, 然后并发导入, 这样就快很多了.

其实之前也测试过的, 但是效果不佳, 开32并发速度都是差不多的..... 因为当时是把每个INSERT语句都均匀的分在每个文件. 这样并不会提高导入速度.

原理

吸取了上次的教训, 这次就按照 每张表一个文件 来拆分. 然后并发导入, 这样同时导入, 速度就会提示.

MYSQLDUMP 文件格式

mysql 5.7 和 8.0 的mysqldump导出的数据是差不多的, 只有一点点区别

格式如下:

代码语言:note
复制
客户端和服务端 版本信息

字符集等变量设置
SQL_LOG_BIN

GLOBAL.GTID_PURGED  如果是8.0的话
CHANGE MASTER 

db1
	create table 建表
	insert into  插入数据
	trigger      触发器
	v1 view      视图
db2 ....


db1
	event
	routin
db2
	...
	

db1 
	view
db2 ....

GLOBAL.GTID_PURGED  如果是5.7的话
字符集等变量设置(改回去)

关键词匹配

那我们就可以根据这个格式来写相关脚本了.

官方可能也考虑到了这一点, 还是提供了相关的关键字的.

关键字关系如下

代码语言:note
复制
GTID 8.0 (开头的)
--
-- GTID state at the beginning of the backup 
--


GTID 5.7 (结尾的)
--
-- GTID state at the end of the backup 
--



CHANGE MASTER
--
-- Position to start replication or point-in-time recovery from
--


建库语句
--
-- Current Database: `ibd2sql`
--

表结构
--
-- Table structure for table `AllTypesExample`
--


INSERT
--
-- Dumping data for table `AllTypesExample`
--


视图(和表放一起的) 8.0
--
-- Temporary view structure for view `v1`
--

视图(和表放一起的) 5.7
--
-- Temporary table structure for view `v_1`
--

EVENT
--
-- Dumping events for database 'db2'
--


ROUTINES
--
-- Dumping routines for database 'db2'
--


VIEW (最后的)
--
-- Final view structure for view `v_1`
--

并发导入原理

并发导入的原理比较简单, 其实就是把进程放后台就行. 主要是注意导入顺序

如果是 5.7 导入到8.0 的话, 需要注意统计信息表是的DROPCREATE是无法执行的, 可以人工注释掉,然后导入, 或者人工收集统计信息.

脚本说明

.sql文件拆分脚本

说明

MysqlDumpSplitSQL.py使用python2编写的(python3有点编码问题). 可以将mysqldump导出的.sql文件拆分为多个文件, 按照如下结构分布:

如果是8.0的话, 还有dbs/special.sql记录统计信息

代码语言:note
复制
splitByddcw_20240229_165143
├── dbs
│   ├── chartest
│   │   ├── appmap.sql
│   │   └── app.sql
│   ├── create.sql
│   ├── gtid.sql
│   ├── master_info.txt
│   ├── T20240227_2
│   └── test
│       └── test.sql
├── events
│   ├── chartest.sql
│   └── test.sql
├── routines
│   ├── chartest.sql
│   ├── db1.sql
└── views
    ├── db1.sql
    └── t20240227.sql

速度还是很快的(嘎嘎快), 1.7GB的文件只要4秒左右就能拆分完.

使用方法

代码语言:shell
复制
python MysqlDumpSplitSQL.py t20240228_alldb.sql

如果只要某张表的话, 还可以使用 --table tablename 来匹配需要的表. 支持正则表达式

详细用法如下

代码语言:shell
复制
python MysqlDumpSplitSQL.py -h
usage: MysqlDumpSplitSQL.py [-h] [--version] [--database DATABASE]
                            [--table TABLE] [--output-dir OUTPUT_DIR]
                            [--presql PRESQL] [--file FILENAME]
                            [--log-file LOG_FILENAME]
                            [files [files ...]]

拆分 mysqldump 导出的.sql文件.

positional arguments:
  files                 要拆分的 mysqldump.sql 文件

optional arguments:
  -h, --help            show this help message and exit
  --version, -v, -V     版本信息
  --database DATABASE   只导入的数据库
  --table TABLE         只导入的表
  --output-dir OUTPUT_DIR
                        输出的目录
  --presql PRESQL       每个.sql文件开头部分, 比如 set
                        sql_log_bin=off; set names utf8mb4
  --file FILENAME       要拆分的 mysqldump.sql 文件
  --log-file LOG_FILENAME
                        日志

导入脚本说明

说明

testparallel.sh 按照mysqldump导出的顺序做导入操作, 还额外检查了下 某些特殊参数, 然后开了并发.差不多就这些. 并发逻辑就是 放后台, 然后循环检查 如果跑完了, 就下一个导入开始. 由于是基于文件级别的, 所以存在短板效应.

使用方法

修改脚本中的连接信息并发度 等信息, 然后执行脚本,后面跟上上面拆分的路径就行. 剩下的脚本自己去识别.

代码语言:shell
复制
sh testparallel.sh splitByddcw_20240229_165143

修改的信息参考如下:

代码语言:shell
复制
#可以修改的参数
CONCURRENCY=4                #并发数量
SLEEP_INTERNAL="0.01"        #每隔 SLEEP_INTERNAL 秒, 检查一次 是否有导入完成的进程
IGNORE_GTID_CHECK="0"        #如果为1, 表示不检查GTID是否存在
IGNORE_FUCNTION_CREATOR="0"  #如果为1, 表示不检查log_bin_trust_function_creators是否为1
IGNORE_DISABLE_ENGINE="0"    #如果为1, 表示不检查disabled_storage_engines是否含MyISAM
LOGFILE="import.log"         #导入日志. 和控制台输出的内容一样
DIRNAME=$1                   #已经拆分了的 mysqldump 导出的SQL文件目录

脚本演示

说这么多, 还不如直接演示一下, 由于打印的日志太长了, 这里就把中间的信息省略了.

为了验证结果正确性, 我们要在导入前后使用如下命令来记录相关表的校验值, 然后验证我们的脚本确实没毛病!

校验命令参考如下:

代码语言:shell
复制
导入前数据校验:
mysql -h127.0.0.1 -P3314 -p123456 -NB -e "select concat('CHECKSUM TABLE \`',TABLE_SCHEMA,'\`.\`',TABLE_NAME,'\`;') FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA NOT IN('sys','mysql','information_schema','performance_schema');" | sort  | mysql -h127.0.0.1 -P3314 -p123456  > /tmp/before_check.txt

导入后数据校验:
mysql -h127.0.0.1 -P3314 -p123456 -NB -e "select concat('CHECKSUM TABLE \`',TABLE_SCHEMA,'\`.\`',TABLE_NAME,'\`;') FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA NOT IN('sys','mysql','information_schema','performance_schema');" | sort | mysql -h127.0.0.1 -P3314 -p123456  > /tmp/after_check.txt

前后数据比较
diff /tmp/before_check.txt /tmp/after_check.txt

导出

导出没啥好说的, 直接全库导出即可

代码语言:shell
复制
17:05:28 [root@ddcw21 mysqldump_t20240226]#mysqldump -h127.0.0.1 -P3314 -p123456 --events --triggers --single-transaction --routines --master-data=2 -A > t20240229_alldb.sql
mysqldump: [Warning] Using a password on the command line interface can be insecure.

17:06:48 [root@ddcw21 mysqldump_t20240226]#
17:06:48 [root@ddcw21 mysqldump_t20240226]#ll -ahrlt t20240229_alldb.sql
-rw-r--r-- 1 root root 1.7G Feb 29 17:06 t20240229_alldb.sql

拆分

直接使用脚本拆即可

代码语言:shell
复制
17:07:55 [root@ddcw21 mysqldump_t20240226]#python MysqlDumpSplitSQL.py t20240229_alldb.sql 
2024-02-29 17:08:03 CLIENT_VERSION: 8.0.28,  SERVER_VERSION: 8.0.28FILE_HEADER:
-- AUTO SPLIT MYSQLDUMP FILE BY DDCW @https://github.com/ddcw
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!50503 SET NAMES utf8mb4 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!50606 SET @OLD_INNODB_STATS_AUTO_RECALC=@@INNODB_STATS_AUTO_RECALC */;
/*!50606 SET GLOBAL INNODB_STATS_AUTO_RECALC=OFF */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
SET @MYSQLDUMP_TEMP_LOG_BIN = @@SESSION.SQL_LOG_BIN;
SET @@SESSION.SQL_LOG_BIN= 0;



2024-02-29 17:08:03 READ TABLE FOR mysql.columns_priv BEGIN
2024-02-29 17:08:03 READ TABLE FOR mysql.columns_priv FINISH. COST TIME: 0.0 seconds
2024-02-29 17:08:03 READ TABLE FOR mysql.component BEGIN
2024-02-29 17:08:03 READ TABLE FOR mysql.component FINISH. COST TIME: 0.0 seconds
2024-02-29 17:08:03 READ TABLE FOR mysql.db BEGIN
2024-02-29 17:08:03 READ TABLE FOR mysql.db FINISH. COST TIME: 0.0 seconds

........................................
日志太多就省略中间部分了
........................................
2024-02-29 17:08:07 READ ROUTINE FOR DATABASE t20240227_2 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ TABLE FOR test.test BEGIN
2024-02-29 17:08:07 READ TABLE FOR test.test FINISH. COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ EVENT FOR DATABASE test BEGIN
2024-02-29 17:08:07 READ EVENT FOR DATABASE test FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ ROUTINE FOR DATABASE test BEGIN
2024-02-29 17:08:07 READ ROUTINE FOR DATABASE test FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ VIEW FOR DATABASE db1 BEGIN
2024-02-29 17:08:07 READ VIEW FOR DATABASE db1 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227 BEGIN
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227_2 BEGIN
2024-02-29 17:08:07 READ VIEW FOR DATABASE t20240227_2 FINISH COST TIME: 0.0 seconds
2024-02-29 17:08:07 READ ALL FINISH
2024-02-29 17:08:07 FILENAME     : /root/mysqldump_t20240226/t20240229_alldb.sql
2024-02-29 17:08:07 OUTPUT_DIR   : /root/mysqldump_t20240226/splitByddcw_20240229_170803
2024-02-29 17:08:07 LOG_FILENAME : /root/mysqldump_t20240226/SplitMysqlDumpSQL.log
2024-02-29 17:08:07 COST TIME    : 4.02 SECONDS.  TABLES COUNT: 412
2024-02-29 17:08:07 WARNING      : 0
17:08:07 [root@ddcw21 mysqldump_t20240226]#

4 秒 -_-

并发导入

直接导入的话, 由于GTID问题, 会导不进去

代码语言:shell
复制
17:09:54 [root@ddcw21 mysqldump_t20240226]#sh testparallel.sh splitByddcw_20240229_170803


********** BEGIN CHECK **************
2024-02-29 17:10:03 CONNCT SUCCESS.
2024-02-29 17:10:03 CHECK_CONN OK
2024-02-29 17:10:03 CURRENT GTID: b68e2434-cd30-11ec-b536-000c2980c11e:1-35 FAILED!
17:10:03 [root@ddcw21 mysqldump_t20240226]#

所以我们要修改脚本, 把忽略GTID给打开, 把IGNORE_FUCNTION_CREATOR也打开吧.

代码语言:shell
复制
IGNORE_GTID_CHECK="1"       
IGNORE_FUCNTION_CREATOR="1"

然后再次导入

代码语言:shell
复制
17:11:38 [root@ddcw21 mysqldump_t20240226]#sh testparallel.sh splitByddcw_20240229_170803


********** BEGIN CHECK **************
2024-02-29 17:11:39 CONNCT SUCCESS.
2024-02-29 17:11:39 CHECK_CONN OK
2024-02-29 17:11:39 CHECK_GTID OK
2024-02-29 17:11:39 MYSQL VERSION: 8 0 28
2024-02-29 17:11:39 CHECK_VERSION OK
2024-02-29 17:11:39 disabled_storage_engines= OK
2024-02-29 17:11:39 log_bin_trust_function_creators=1 OK
........................................
日志太多就省略中间部分了
........................................
#################### IMPORT APP VIEWS #####################
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/db1.sql BEGIN...
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/db1.sql SUCCESS.
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/db1.sql FINISH. cost 0 seconds
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227.sql BEGIN...
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227.sql SUCCESS.
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227.sql FINISH. cost 0 seconds
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227_2.sql BEGIN...
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227_2.sql SUCCESS.
2024-02-29 17:15:16 IMPORT splitByddcw_20240229_170803/views/t20240227_2.sql FINISH. cost 0 seconds
2024-02-29 17:15:16 APP DATABASE COUNT: 24 APP TABLE COUNT: 377 APP DATA IMPORT COST_TIME: 215 SECONDS.
2024-02-29 17:15:16 IMPORT ALL FINISH. TOTAL COST TIME 217 SECONDS. FAILED COUNT: 1
2024-02-29 17:15:16 ERROR COUNT: 1
2024-02-29 17:15:16 
 splitByddcw_20240229_170803/dbs/gtid.sql IMPORT FAILED

耗时 217秒 , 还行... 主要是那种大表太慢了(170秒)....

这里有个ERROR, 是gtid.sql导入失败了. 我们不用管它. 因为是我们自己忽略的. 当然也可以选择reset master然后再导入这个gtid.sql也是可以的

正常导入

直接导入会报错, 有gtid问题, 所以要先reset master

代码语言:shell
复制
17:19:53 [root@ddcw21 mysqldump_t20240226]#mysql -h127.0.0.1 -P3314 -p123456 < t20240229_alldb.sql 
mysql: [Warning] Using a password on the command line interface can be insecure.
ERROR 3546 (HY000) at line 26: @@GLOBAL.GTID_PURGED cannot be changed: the added gtid set must not overlap with @@GLOBAL.GTID_EXECUTED

reset master后, 开始导入 (加个time, 不然看不到时间, 不好比较)

代码语言:shell
复制
17:21:22 [root@ddcw21 mysqldump_t20240226]#time mysql -h127.0.0.1 -P3314 -p123456 < t20240229_alldb.sql 
mysql: [Warning] Using a password on the command line interface can be insecure.


real	4m39.067s
user	0m11.893s
sys	0m4.000s
17:26:02 [root@ddcw21 mysqldump_t20240226]#

感觉上就比较慢了.

验证

我这里忘了校验前后数据一致性了.....

之前做测试的时候 校验过的, 是一致的.

时间对比

拆分时间4秒 加上 导入217秒, 耗时3min37s

导入类型

时间

原生导入

4min39s

4并发

3min37s

8并发

3min12s

效果还是有的, 但是有短板效应.

总结

  1. mysql并发导入确实能提升速度, 但是存在短板效应, 如果有一张表占比特别大的话, 并发导入的优势就不明显.
  2. mysql 5.7和8.0 的mysqldump命令导出的文件还是有区别的. 对于统计信息表, 5.7 是含有DROPCREATE的, 但 8.0 只有 INSERT IGNORE INTO
  3. MySQL 5.7和8.0的GTID位置也不一样.
  4. 4并发就够了, 并发多了, 提升不是很明显(IO瓶颈了)

附相关源码

github地址: https://github.com/ddcw/ddcw/tree/master/python/MySQL%E5%B9%B6%E5%8F%91%E5%AF%BC%E5%85%A5

墨天轮地址: https://www.modb.pro/doc/125805 mysqldump拆分脚本

代码语言:python
复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# write by ddcw @https://github.com/ddcw
# 拆分 mysqldump 导出的.sql文件 (暂不支持过滤)
# 本脚本没有恢复 会话变量, 所以不建议使用 source

import sys,os
import re
import datetime,time
import errno
import argparse,glob

# 解析参数
def _argparse():
	parser = argparse.ArgumentParser(add_help=True, description='拆分 mysqldump 导出的.sql文件.')
	parser.add_argument('--version', '-v', '-V', action='store_true', dest="VERSION", default=False, help='版本信息')
	parser.add_argument('--database', dest="DATABASE", default="*", help='只导入的数据库')
	parser.add_argument('--table', dest="TABLE", default="*", help='只导入的表')
	parser.add_argument('--output-dir', dest="OUTPUT_DIR", default="", help='输出的目录')
	#parser.add_argument('--parallel', dest="PARALLEL", default=4, help='导入并发度')
	parser.add_argument('--presql', dest="PRESQL", default="",  help='每个.sql文件开头部分, 比如 set sql_log_bin=off; set names utf8mb4')
	#parser.add_argument('--postsql', dest="POSTSQL",  help='每个.sql文件结尾部分, 比如 set sql_log_bin=on;')
	#parser.add_argument('--mysqlbin', dest="MYSQLBIN", default="mysql -c ", help='导入时, 使用的mysql命令')
	parser.add_argument('--file', dest="FILENAME", default="", help='要拆分的 mysqldump.sql 文件')
	parser.add_argument('--log-file', dest="LOG_FILENAME", default="", help='日志')
	parser.add_argument("files", nargs="*", help="要拆分的 mysqldump.sql 文件")
	if parser.parse_args().VERSION:
		print('VERSION: v0.1')
		sys.exit(0)
	return parser.parse_args()

#匹配表是否符合要求的
def match_table(dbname,tablename):
	dtname = str(dbname) + "." + tablename
	return True if re.search(PATTERN,dtname) else False

def log(*args):
	msg = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + " " + " ".join([ str(x) for x in args ]) + "\n"
	print(msg[:-1])
	LOG_FD.write(msg)

def read_header(f):
	client_version = "unknown"
	server_version = "unknown"
	gtid = "" # 兼容5.7
	master_info = ""
	header = ""
	while True:
		old_offset = f.tell()
		data = f.readline()
		if data[:17] == "-- Server version":
			server_version = data.split()[-1:][0]
		elif data[:14] == "-- MySQL dump ":
			client_version = data.split()[5]
		elif data[:21] == "-- GTID state at the ":
			tdata = ""
			f.seek(old_offset,0)
			while True:
				_data = f.readline()
				tdata += _data
				if _data[-2:] == ";\n":
					break
			gtid = tdata
		elif data[:21] == "-- Position to start ":
			tdata = ""
			f.seek(old_offset,0)
			while True:
				_data = f.readline()
				tdata += _data
				if _data[-2:] == ";\n":
					break
			master_info = tdata
		elif data[:3] == "/*!" or data[:4] == "SET ":
			header += data
		elif data[:20] == "-- Current Database:":
			f.seek(old_offset,0)
			break
	return header,client_version,server_version,gtid,master_info

def read_view(f,header,db):
	view_name = "views" + "/" + str(db) + ".sql"
	with open(view_name, 'w') as fd:
		fd.write(header)
		fd.write("USE `"+db+"`;\n")
		while True:
			old_offset = f.tell()
			data = f.readline()
			if data[:3] == "-- " and data[:4] != "-- F":
				f.seek(old_offset,0)
				break
			fd.write(data)

def read_routine(f,header,db):
	routine_filename = "routines"+"/"+str(db)+".sql"
	with open(routine_filename, 'w') as fd:
		fd.write(header)
		data = f.readline()
		data += f.readline()
		fd.write("USE `"+db+"`;\n")
		fd.write(data)
		while True:
			old_offset = f.tell()
			data = f.readline()
			if data[:3] == "--\n":
				#f.seek(old_offset,0)
				break
			fd.write(data)

def read_event(f,header,db):
	event_filename = "events/"+db+".sql"
	with open(event_filename, 'w') as fd:
		fd.write(header)
		data = f.readline()
		data += f.readline()
		fd.write("USE `"+db+"`;\n")
		fd.write(data)
		while True:
			data = f.readline()
			fd.write(data)
			if data == "DELIMITER ;\n" or data[:3] == "--\n":
				break
				
		#data = f.readline()

class NULLPASS(object):
	def __init__(self,*args):
		pass
	def write(self,*args):
		pass

def read_table(f,header,db):
	data = f.readline()
	table_name = re.compile("`(.+)`").findall(data)[0]
	data += "--\n" + f.readline()
	filename = "dbs/" + db + "/" + table_name + ".sql"
	with open(filename,'w') as fd:
		if not match_table(db,table_name):
			#log(db+"."+table_name+" NOT MATCH "+PATTERN+" SKIP IT!")
			fd = NULLPASS()
		fd.write(header)
		fd.write(data)
		fd.write("USE `"+db+"`;")
		#读表结构
		while True:
			data = f.readline()
			fd.write(data)
			if data == "--\n":
				break

		#读数据
		old_offset = f.tell()
		data = f.readline()
		if data[:26] == "-- Dumping data for table ":
			data += f.readline() + f.readline()
			fd.write(data)
			while True:
				data = f.readline()
				fd.write(data)
				if data == "--\n": #可能有触发器, 所以不能以UNLOCK TABLES;结束
					break
		else:
			f.seek(old_offset,0)
	return True
				

# 建目录
def mkdir_exists(dirname):
	try:
		os.makedirs(dirname)
	except OSError as e:
		if e.errno != errno.EEXIST:
			raise

if __name__ == '__main__':
	START_TIME = time.time()
	parser = _argparse()
	FILTER_DATABASE = parser.DATABASE
	FILTER_TABLE = parser.TABLE
	PATTERN = str(FILTER_DATABASE).replace("*",".*") + "\." + str(FILTER_TABLE).replace("*",".*")
	filelist = []
	for pattern in parser.files:
		filelist += glob.glob(pattern)
	fileset = filelist
	FILENAME = parser.FILENAME if parser.FILENAME != "" and os.path.exists(parser.FILENAME) else ""
	FILENAME = fileset[0] if len(fileset) >= 1 and FILENAME == "" and os.path.exists(fileset[0]) else ""
	if FILENAME == "":
		print('At least one binlog file')
		sys.exit(1)

	FILENAME = os.path.abspath(FILENAME) # 设置为绝对路径, 因为后面要切换rootdir
	LOG_FILENAME = "SplitMysqlDumpSQL.log" if parser.LOG_FILENAME == "" else parser.LOG_FILENAME
	LOG_FILENAME = os.path.abspath(LOG_FILENAME)

	OUTPUT_DIR = parser.OUTPUT_DIR if parser.OUTPUT_DIR != "" else "splitByddcw_" + str(datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))
	OUTPUT_DIR = os.path.abspath(OUTPUT_DIR)
	#print("READ FILENAME: "+FILENAME+" OUTPUT_DIR: "+OUTPUT_DIR)
	mkdir_exists(OUTPUT_DIR)
	# 不检查空间是否足够了(lazy), 要自行检查(要求1.1倍.sql文件大小)

	# 创建相关目录
	os.chdir(OUTPUT_DIR) # 切换工作目录, 懒得去拼接目录了....
	mkdir_exists("dbs")       #库表信息
	mkdir_exists("events")    #event
	#mkdir_exists("triggers")  #触发器
	mkdir_exists("routines")  #存储过程和函数
	mkdir_exists("views")      #单独的view(Final view structure)  不含Temporary table structure for view

	f = open(FILENAME,'r')
	LOG_FD = open(LOG_FILENAME,'a')
	CREATE_DB_FD = open('dbs/create.sql','w') #建库语句
	FILE_HEADER,client_version,server_version,gtid_info,master_info = read_header(f)
	CREATE_DB_FD.write(FILE_HEADER)
	CREATE_DB_FD.write("\n")
	FILE_HEADER = "-- AUTO SPLIT MYSQLDUMP FILE BY DDCW @https://github.com/ddcw\n" + FILE_HEADER + "\n" + parser.PRESQL + "\n"
	_msg = "CLIENT_VERSION: " + client_version + "  SERVER_VERSION: " + server_version + "FILE_HEADER:\n" + FILE_HEADER
	log(_msg)
	# 写change master
	if master_info != "":
		_master_file = "dbs/master_info.txt"
		with open(_master_file, 'w') as _mf:
			_mf.write(master_info)

	# 读库表信息
	CURRENT_DB = ""
	TABLE_COUNT = 0
	WARNING_COUNT = 0
	while True:
		old_offset = f.tell()
		data = f.readline()
		if data == "": #读完了.
			break
		elif data[:20] == "-- Current Database:":
			CURRENT_DB = re.compile("`(.+)`").findall(data)[0]
			_dirname = "dbs/" + str(CURRENT_DB)
			mkdir_exists(_dirname)
		elif data[:16] == "CREATE DATABASE ":
			CREATE_DB_FD.write(data)
		#CREATE TABLE, INSERT, CREATE TRIGGER, CREATE VIEW
		elif data[:28] == "-- Table structure for table" or data[:36] == "-- Temporary view structure for view" or data[:37] == "-- Temporary table structure for view":
			TABLE_COUNT += 1
			f.seek(old_offset,0)
			table_name = re.compile("`(.+)`").findall(data)[0]
			log("READ TABLE FOR "+CURRENT_DB+"."+table_name,'BEGIN')
			_st = time.time()
			read_table(f,FILE_HEADER,CURRENT_DB)
			_et = time.time()
			log("READ TABLE FOR "+CURRENT_DB+"."+table_name,"FINISH.","COST TIME: "+ str(round((_et-_st),2)) +" seconds")

		#READ EVENT
		elif data[:31] == "-- Dumping events for database ":
			CURRENT_DB = re.compile("'(.+)'").findall(data)[0]
			f.seek(old_offset,0)
			log("READ EVENT FOR DATABASE "+CURRENT_DB+" BEGIN")
			_st = time.time()
			read_event(f,FILE_HEADER,CURRENT_DB)
			_et = time.time()
			log("READ EVENT FOR DATABASE "+CURRENT_DB+" FINISH","COST TIME: "+ str(round((_et-_st),2)) +" seconds")
			

		#READ ROUTINE
		elif data[:33] == "-- Dumping routines for database ":
			CURRENT_DB = re.compile("'(.+)'").findall(data)[0]
			f.seek(old_offset,0)
			log("READ ROUTINE FOR DATABASE "+CURRENT_DB+" BEGIN")
			_st = time.time()
			read_routine(f,FILE_HEADER,CURRENT_DB)
			_et = time.time()
			log("READ ROUTINE FOR DATABASE "+CURRENT_DB+" FINISH","COST TIME: "+ str(round((_et-_st),2)) +" seconds")

		#READ VIEW
		elif data[:33] == "-- Final view structure for view ":
			f.seek(old_offset,0)
			log("READ VIEW FOR DATABASE "+CURRENT_DB+" BEGIN")
			_st = time.time()
			read_view(f,FILE_HEADER,CURRENT_DB)
			_et = time.time()
			log("READ VIEW FOR DATABASE "+CURRENT_DB+" FINISH","COST TIME: "+ str(round((_et-_st),2)) +" seconds")

		elif data[:21] == "-- GTID state at the ":
			tdata = ""
			f.seek(old_offset,0)
			log("READ GTID PURGED")
			while True:
				_data = f.readline()
				tdata += _data
				if _data[-2:] == ";\n":
					break
			gtid_info += tdata
		elif data[:26] == "-- Dumping data for table ": #系统库, 主要是统计信息
			# ONLY FOR MYSQL 8.x
			_filename = "dbs/special.sql"
			with open(_filename,'a') as fd:
				fd.write(FILE_HEADER)
				fd.write("\n")
				fd.write(data)
				fd.write("USE `"+CURRENT_DB+"`;\n")
				while True:
					_old_offset = f.tell()
					data = f.readline()
					if data[:3] == "-- ":
						f.seek(_old_offset,0)
						break
					fd.write(data)
		elif data[:20] == "-- Dump completed on":
			log("READ ALL FINISH")
			break
		elif data[:4] == "USE " or data == "\n" or data == "--\n":
			pass
		elif data[:3] == "/*!" or data[:4] == "SET ": #跳过结尾的注释和一些set
			pass
		else:
			WARNING_COUNT += 1
			log("========== SKIP ==========\n"+data)


	# 写 gtid信息, 5.7 在结尾, 所以后写
	if gtid_info != "":
		_gtid_file = "dbs/gtid.sql"
		with open(_gtid_file, 'w') as _mf:
			_mf.write(FILE_HEADER)
			_mf.write("\n")
			_mf.write(gtid_info)
	STOP_TIME = time.time()
	log("FILENAME     :",FILENAME)
	log("OUTPUT_DIR   :",OUTPUT_DIR)
	log("LOG_FILENAME :",LOG_FILENAME)
	log("COST TIME    :",str(round((STOP_TIME - START_TIME),2)), "SECONDS.  TABLES COUNT:",TABLE_COUNT)
	log("WARNING      :",WARNING_COUNT)
	f.close()
	LOG_FD.close()
	CREATE_DB_FD.close()

并发导入脚本

代码语言:shell
复制
#!/usr/bin/env bash
#write by ddcw @https://github.com/ddcw


#可以修改的参数
CONCURRENCY=4                #并发数量
SLEEP_INTERNAL="0.01"        #每隔 SLEEP_INTERNAL 秒, 检查一次 是否有导入完成的进程
IGNORE_GTID_CHECK="1"        #如果为1, 表示不检查GTID是否存在
IGNORE_FUCNTION_CREATOR="1"  #如果为1, 表示不检查log_bin_trust_function_creators是否为1
IGNORE_DISABLE_ENGINE="0"    #如果为1, 表示不检查disabled_storage_engines是否含MyISAM
LOGFILE="import.log"         #导入日志. 和控制台输出的内容一样
DIRNAME=$1                   #已经拆分了的 mysqldump 导出的SQL文件目录

#MYSQL连接信息
MYSQL_COM="mysql -h127.0.0.1 -p123456 -P3314 -uroot "


touch ${LOGFILE}
#不可修改参数
MYSQL_VERSION=()            #MYSQL SERVER版本信息
export LANG="en_US.UTF-8"   #设置LANG
POSTMSG=""                  #结尾时打印的信息, 先保存起来
APP_TABLE_TIME_COUNT="0"    #导入业务表的时间
DB_COUNT=0                  #导入的库计数, 不含系统库
FILES_COUNT=0               #导入的文件计数. 只考虑业务表
FAIL_COUNT_1=`grep ' FAILED$' ${LOGFILE} | wc -l` #记录之前日志的报错信息
ERROR_COUNT=0               #error计数

exit1(){
	echo "`date "+%Y-%m-%d %H:%M:%S"` ${@}"
	exit 1
}
[ "${DIRNAME}" == "" ] && exit1 "sh $0 splitByddcw_XXXXXX"
[ -d ${DIRNAME} ] || exit1 "${DIRNAME} is not exists"
DIRNAME=${DIRNAME%/}  #格式化目录变量, 去掉结尾的/  方便目录拼接

log(){
	_msg="$(date '+%Y-%m-%d %H:%M:%S') $@"
	echo -e ${_msg} # | tee -a ${LOGFILE}
	echo -e ${_msg} >> ${LOGFILE}
	
}

import_sql(){
	file=$1
	ts=`date +%s`
	log "IMPORT ${file} BEGIN..."
	#${MYSQL_COM} < $file >>${LOGFILE} 2>&1 && log "IMPORT ${file} SUCCESS." || log "IMPORT ${file} FAILED"
	if ${MYSQL_COM} < $file >>${LOGFILE} 2>&1;then
		log "IMPORT ${file} SUCCESS."
	else
		log "IMPORT ${file} FAILED"
		((ERROR_COUNT++))
		return 1
	fi
	te=`date +%s`
	log "IMPORT ${file} FINISH. cost $[ ${te} - ${ts} ] seconds"
	return 0
}

CHECK_CONN(){
	if ${MYSQL_COM} -e "select 1+1" >>${LOGFILE} 2>&1; then
		log "CONNCT SUCCESS."
	else
		exit1 "CONNCT FAILD. Please read log (${LOGFILE}) FAILED!"
	fi
	log "CHECK_CONN OK"
}

CHECK_GTID(){
	current_gtid=`${MYSQL_COM} -NB -e "select @@GLOBAL.GTID_EXECUTED" 2>>${LOGFILE}`
	if [ "${current_gtid}" != "" ] && [ "${IGNORE_GTID_CHECK}" != "1" ];then
		exit1 "CURRENT GTID: ${current_gtid} FAILED!"
	fi
	log "CHECK_GTID OK"
}

CHECK_VERSION(){
	IFS='.' read -r -a MYSQL_VERSION <<< `${MYSQL_COM} -NB -e "select @@version" 2>>${LOGFILE}`
	if [ "${MYSQL_VERSION[0]}" == "5" ] || [ "${MYSQL_VERSION[0]}" == "8" ]  ;then
		log "MYSQL VERSION: ${MYSQL_VERSION[@]}"
	else
		exit1 "ONLY FOR MYSQL 5/8, CURRENT MYSQL VERSION:${MYSQL_VERSION[@]} FAILED!"
	fi
	log "CHECK_VERSION OK"
}

CHECK_VARIABELS(){
	disabled_storage_engines=`${MYSQL_COM} -NB -e "select @@disabled_storage_engines" 2>>${LOGFILE}`
	if echo "${disabled_storage_engines}" | grep -i "MyISAM" >/dev/null;then
		if [ "${IGNORE_DISABLE_ENGINE}" != "1" ];then
			exit1 "disabled_storage_engines=${disabled_storage_engines} FAILED"
		fi
	else
		log "disabled_storage_engines=${disabled_storage_engines} OK"
	fi

	log_bin_trust_function_creators=`${MYSQL_COM} -NB -e "select @@log_bin_trust_function_creators" 2>>${LOGFILE}`
	if [ "${log_bin_trust_function_creators}" == "0" ] && [ "${IGNORE_FUCNTION_CREATOR}" != "1" ];then
		exit1 "log_bin_trust_function_creators=${log_bin_trust_function_creators} FAILED"
	else
		log "log_bin_trust_function_creators=${log_bin_trust_function_creators} OK"
	fi
	log "CHECK_VARIABELS OK"
}


IMPORT_CHANGE_MASTER(){
	echo ""
}


IMPORT_GTID(){
	if [ -f ${DIRNAME}/dbs/gtid.sql ];then
		log "\n\n#################### IMPORT GTID #####################"
	else
		log "SKIP IMPORT GTID"
		return 1
	fi
	import_sql ${DIRNAME}/dbs/gtid.sql || POSTMSG="${POSTMSG}\n ${DIRNAME}/dbs/gtid.sql IMPORT FAILED"
}


IMPORT_DATABASE_DDL(){
	if [ -f ${DIRNAME}/dbs/create.sql ];then
		log "\n\n#################### IMPORT CREATE DATABASE DDL #####################"
	else
		log "SKIP IMPORT DATABASE DDL"
		return 1
	fi
	import_sql ${DIRNAME}/dbs/create.sql || POSTMSG="${POSTMSG}\n ${DIRNAME}/dbs/create.sql IMPORT FAILED"
}


IMPORT_MYSQL_DATABASE(){
	if [ -d ${DIRNAME}/dbs/mysql ];then
		log "\n\n#################### IMPORT MYSQL DB #####################"
	else
		log "SKIP IMPORT mysql DATABASE"
		return 1
	fi
	for filename in ${DIRNAME}/dbs/mysql/*.sql; do
		import_sql ${filename} || POSTMSG="${POSTMSG}\n ${filename} IMPORT FAILED"
	done
}

IMPORT_MYSQL_STATICS(){
	if [ -f ${DIRNAME}/dbs/special.sql ];then
		log "\n\n#################### IMPORT MYSQL DB STATICS (ONLY FOR mysql 8.x) #####################"
	else
		log "SKIP IMPORT mysql statics. MYSQL_VERSION: ${MYSQL_VERSION[@]}"
		return 1
	fi
	import_sql ${DIRNAME}/dbs/special.sql || POSTMSG="${POSTMSG}\n ${DIRNAME}/dbs/special.sql IMPORT FAILED"
}

#并发导入业务表
IMPORT_APP_TABLE(){
	log "\n\n#################### IMPORT APP TABLE&DATA #####################"
	T_START=`date +%s`
	PIDS=()
	for dirname in `find ${DIRNAME}/dbs -type d`;do
		postname=`echo ${dirname} | awk -F '/' '{print $NF}'`
		if [ "${postname}" == "mysql" ] || [ "${postname}" == "" ] || [ "${DIRNAME}/dbs" == "${dirname}" ];then
			continue #跳过mysql库
		fi
		log "IMPORT DATABSE FOR ${postname} BEGIN"
		#PIDS=()
		db_start_time=`date +%s`
		for filename in `find ${dirname} -name '*.sql'`;do
			while [ ${#PIDS[@]} -ge ${CONCURRENCY} ];do
				sleep ${SLEEP_INTERNAL}
				for i in "${!PIDS[@]}";do
					if ! kill -0 "${PIDS[$i]}" 2>/dev/null;then
						unset 'PIDS[i]' #这个导入进程跑完了. 就从数组里面移除
					fi
				done
				PIDS=("${PIDS[@]}") #重新初始化一下PIDS
			done
			import_sql "${filename}" & #放后台导入, 也就是开并发
			PIDS+=($!)
			((FILES_COUNT++))
		done
		#wait #等待这个库的所有表导完. 不然进程数可能超过设置的大小. 也就是去掉之后性能能提升一部分
		db_stop_time=`date +%s`
		((DB_COUNT++))
		log "IMPORT DATABSE FOR ${postname} FINISH.  COST_TIME: $[ ${db_stop_time} - ${db_start_time} ] SECONDS."
	done
	wait #等待所有后台进程跑完
	T_STOP=`date +%s`
	APP_TABLE_TIME_COUNT="$[ ${T_STOP} - ${T_START} ]"
}


IMPORT_APP_EVENT(){
	log "\n\n#################### IMPORT APP EVENTS #####################"
	for filename in `find ${DIRNAME}/events -name '*.sql'`;do
		import_sql ${filename}
	done
}

IMPORT_APP_ROUTINE(){
	log "\n\n#################### IMPORT APP ROUTINES #####################"
	for filename in `find ${DIRNAME}/routines -name '*.sql'`;do
		import_sql ${filename}
	done
}

IMPORT_APP_VIEW(){
	log "\n\n#################### IMPORT APP VIEWS #####################"
	for filename in `find ${DIRNAME}/views -name '*.sql'`;do
		import_sql ${filename}
	done
}



_START_DT=`date +%s`

echo -e "\n\n********** BEGIN CHECK **************"
#检查能否连接
CHECK_CONN

#检查GTID是否存在
CHECK_GTID

#检查版本
CHECK_VERSION

#检查变量disabled_storage_engines log_bin_trust_function_creators
CHECK_VARIABELS
echo "********** CHECK FINISH **************\n\n"




#数据导入
echo "\n\n********** BEGIN IMPORT DATA **************"

#导入change master语句. 默认注释, 需要人工启用
IMPORT_CHANGE_MASTER

#导入GTID(8.0.x)
if [ "${MYSQL_VERSION[0]}" == "8" ];then
	IMPORT_GTID
fi

#导入数据库DDL
IMPORT_DATABASE_DDL

#导入系统库表
IMPORT_MYSQL_DATABASE

#导入统计信息
IMPORT_MYSQL_STATICS

#业务表(并发)(可能含触发器)
IMPORT_APP_TABLE


#导入EVENT
IMPORT_APP_EVENT

#业务存储过程和函数
IMPORT_APP_ROUTINE

#业务视图
IMPORT_APP_VIEW

#导入GTID(5.7.x)
if [ "${MYSQL_VERSION[0]}" == "5" ];then
	IMPORT_GTID
fi

_STOP_DT=`date +%s`
FAIL_COUNT_2=`grep ' FAILED$' ${LOGFILE} | wc -l`
log "APP DATABASE COUNT: ${DB_COUNT}    APP TABLE COUNT: ${FILES_COUNT}    APP DATA IMPORT COST_TIME: ${APP_TABLE_TIME_COUNT} SECONDS."
log "IMPORT ALL FINISH. TOTAL COST TIME $[ ${_STOP_DT} - ${_START_DT} ] SECONDS.  FAILED COUNT: $[ ${FAIL_COUNT_2} - ${FAIL_COUNT_1} ]"
log "ERROR COUNT: ${ERROR_COUNT}"
if [ "${POSTMSG}" != "" ];then
	log "${POSTMSG}"
fi

#统计信息导入失败
if echo "${POST}" | grep -E "innodb_index_stats.sql|innodb_table_stats.sql" >/dev/null 2>&1 ;then
	log "统计信息导入失败, 原因可能为 5.7 --> 8.0 建议如下:\n\t 1. 手动 导入统计信息表(删除DROP TABLE和CREATE TABLE之后在导入)\n\t 2. 手动使用 ANALYZE TABLE 去收集统计信息"
fi

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 导读
  • 原理
    • MYSQLDUMP 文件格式
      • 关键词匹配
        • 并发导入原理
        • 脚本说明
          • .sql文件拆分脚本
            • 说明
            • 使用方法
          • 导入脚本说明
            • 说明
            • 使用方法
        • 脚本演示
          • 导出
            • 拆分
              • 并发导入
                • 正常导入
                  • 验证
                    • 时间对比
                    • 总结
                    • 附相关源码
                      • 墨天轮地址: https://www.modb.pro/doc/125805 mysqldump拆分脚本
                        • 并发导入脚本
                        相关产品与服务
                        云数据库 MySQL
                        腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档