作为一名DBA,需要确保数据库有备份,同时也要确保备份文件是有效的。因此需要定期对生产的备份文件进行离线恢复,验证备份文件的可用性。
这里分享下自动化的思路:
下面这个脚本 放到 /mnt/mysql_logical_backup/ 这个备份文件的根目录下:
# 我这里的备份文件夹路径类型如下:
# /mnt/mysql_logical_backup/prod-devops-cmdb-01/2023-09-23/ 下面是以db命名的gz压缩包
# /mnt/mysql_logical_backup/prod-devops-cicd-02/2023-10-11/ 下面是以db命名的gz压缩包
import os
import glob
import random
import shutil
import gzip
import subprocess
import json
import mysql.connector
from datetime import datetime
# 获取当前工作目录
cwd = os.getcwd()
# 列出当前目录下的所有文件夹
foldername_list = []
for foldername in os.listdir(cwd):
if os.path.isdir(os.path.join(cwd, foldername)):
foldername_list.append(foldername)
# 随机取一个备份集
random_instance = random.choice(foldername_list)
print("抽取的实例为", random_instance)
# 进入到这个 random_instance 目录下,在近N天的备份目录中,随机抽一个日期
directory_path = str(random_instance)
# 切换到目标目录
os.chdir(directory_path)
# 不找最后一个日期的文件夹,防止出现数据不完整的情况
random_directory_list = sorted(os.listdir())[-5:-1]
day_path = random.choice(random_directory_list)
print("抽取的备份文件的日期名为", day_path)
# 进到这个日期名下的文件夹
os.chdir(day_path)
# 随机抽几个库备份的压缩包
res2 = sorted(os.listdir())
random_db_list = set() # 防止随机取的库重复,用set来存储
for i in range(5):
res3 = random.choice(res2)
if "mysql" in res3: # 脚本在备份的时候已经把其他的系统库滤掉,这里再过滤一次mysql库
continue
random_db_list.add(res3)
# 复制这N个库的备份文件,并解压到临时目录 /mnt/mysql_3316/temp/ 下
target_dir = "/mnt/mysql_3316/temp/"
for ii in random_db_list:
print(ii)
# 指定源文件路径和目标路径
source_file = (
"/mnt/mysql_backup/"
+ str(directory_path)
+ "/"
+ str(day_path)
+ "/"
+ str(ii)
)
print(source_file)
# 复制文件到目标目录
shutil.copy(source_file, target_dir)
# 解压文件
gzip_file = ii
sql_file = (
target_dir + gzip_file.split("/")[-1].split(".")[0].split("-")[0] + ".sql"
)
with gzip.open(gzip_file, "rb") as f_in, open(sql_file, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
# 将数据文件导入到 sandbox中(为了最大化导入效率, 这里建议直接使用mysql命令行导入)
mysql_cmd = [
"mysql",
"-u",
"root",
"-p123456",
"-h",
"127.0.0.1",
"--port",
"3316",
]
# 构建source命令
source_cmd = f"source {sql_file}"
# 使用subprocess调用MySQL命令行
process = subprocess.Popen(
mysql_cmd + ["-e", source_cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# 获取命令行输出和错误信息
stdout, stderr = process.communicate()
# 输出结果
print("输出:", stdout.decode("utf-8"))
print("错误:", stderr.decode("utf-8"))
# 随机抽取50张表,统计行数,并写到文件中
mydb = mysql.connector.connect(
host="127.0.0.1",
port=3316,
user="root",
passwd="123456",
database="information_schema",
autocommit=True,
)
mycursor = mydb.cursor()
mycursor.execute(
"""select concat('select count(*) from ','`', TABLE_SCHEMA,'`.`',TABLE_NAME,'`', ' where 1=1;' ) as count_sql from information_schema.tables where TABLE_SCHEMA not in ('sys','test','mysql','information_schema','performance_schema') order by rand() limit 50;"""
)
res4 = mycursor.fetchall()
res_all = dict()
for sql in res4:
mycursor.execute(sql[0])
res5 = mycursor.fetchone()
res_all[sql[0]] = res5[0]
res_all = (
'{"instance":"'
+ str(random_instance)
+ '","detail":['
+ json.dumps(res_all)
+ "]}"
)
# 结果写到以实例命名的log文件中
with open("/tmp/res.log", "w") as f:
f.write(res_all)
# 日志
with open("/tmp/recovery_test.log", "a+") as f:
f.writelines(
"脚本执行的日期:"
+ str(datetime.now().date())
+ ",抽取的实例:"
+ random_instance
+ ", 抽取的备份文件的日期名:"
+ day_path
+ "\n"
)
# 最后,清理下临时文件,避免太多磁盘占用
files = glob.glob(os.path.join(target_dir, "*"))
for file in files:
if os.path.isfile(file):
os.remove(file)
elif os.path.isdir(file):
shutil.rmtree(file)
这里就不贴代码了,其实就是解析step1的json结果,然后去生产count(*)查询数据而已。
比对完成后,还需要把结果记录到数据库中,然后发个总结邮件出来。
最后,在验证无误后,还要把临时MySQL里面的数据清掉,防止遗忘造成数据泄露。
大体就是这个流程了。代码写的比较拉,这里就不贴了。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。