前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >DB离线恢复演练

DB离线恢复演练

原创
作者头像
保持热爱奔赴山海
修改2023-10-19 08:09:39
2180
修改2023-10-19 08:09:39
举报
文章被收录于专栏:饮水机管理员饮水机管理员

作为一名DBA,需要确保数据库有备份,同时也要确保备份文件是有效的。因此需要定期对生产的备份文件进行离线恢复,验证备份文件的可用性。

这里分享下自动化的思路:

1、从备份集中随机挑选一个实例,然后随机挑选一个日期

下面这个脚本 放到 /mnt/mysql_logical_backup/ 这个备份文件的根目录下:

代码语言:python
复制
# 我这里的备份文件夹路径类型如下:
#  /mnt/mysql_logical_backup/prod-devops-cmdb-01/2023-09-23/ 下面是以db命名的gz压缩包
#  /mnt/mysql_logical_backup/prod-devops-cicd-02/2023-10-11/ 下面是以db命名的gz压缩包

import os
import glob
import random
import shutil
import gzip
import subprocess
import json
import mysql.connector
from datetime import datetime

# 获取当前工作目录
cwd = os.getcwd()

# 列出当前目录下的所有文件夹
foldername_list = []
for foldername in os.listdir(cwd):
    if os.path.isdir(os.path.join(cwd, foldername)):
        foldername_list.append(foldername)

# 随机取一个备份集
random_instance = random.choice(foldername_list)
print("抽取的实例为", random_instance)

# 进入到这个 random_instance 目录下,在近N天的备份目录中,随机抽一个日期
directory_path = str(random_instance)

# 切换到目标目录
os.chdir(directory_path)

# 不找最后一个日期的文件夹,防止出现数据不完整的情况
random_directory_list = sorted(os.listdir())[-5:-1]
day_path = random.choice(random_directory_list)
print("抽取的备份文件的日期名为", day_path)

# 进到这个日期名下的文件夹
os.chdir(day_path)

# 随机抽几个库备份的压缩包
res2 = sorted(os.listdir())
random_db_list = set()  # 防止随机取的库重复,用set来存储
for i in range(5):
    res3 = random.choice(res2)
    if "mysql" in res3:  # 脚本在备份的时候已经把其他的系统库滤掉,这里再过滤一次mysql库
        continue
    random_db_list.add(res3)


# 复制这N个库的备份文件,并解压到临时目录 /mnt/mysql_3316/temp/ 下
target_dir = "/mnt/mysql_3316/temp/"
for ii in random_db_list:
    print(ii)
    # 指定源文件路径和目标路径
    source_file = (
        "/mnt/mysql_backup/"
        + str(directory_path)
        + "/"
        + str(day_path)
        + "/"
        + str(ii)
    )
    print(source_file)

    # 复制文件到目标目录
    shutil.copy(source_file, target_dir)

    # 解压文件
    gzip_file = ii
    sql_file = (
        target_dir + gzip_file.split("/")[-1].split(".")[0].split("-")[0] + ".sql"
    )
    with gzip.open(gzip_file, "rb") as f_in, open(sql_file, "wb") as f_out:
        shutil.copyfileobj(f_in, f_out)

    # 将数据文件导入到 sandbox中(为了最大化导入效率, 这里建议直接使用mysql命令行导入)
    mysql_cmd = [
        "mysql",
        "-u",
        "root",
        "-p123456",
        "-h",
        "127.0.0.1",
        "--port",
        "3316",
    ]

    # 构建source命令
    source_cmd = f"source {sql_file}"

    # 使用subprocess调用MySQL命令行
    process = subprocess.Popen(
        mysql_cmd + ["-e", source_cmd],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )

    # 获取命令行输出和错误信息
    stdout, stderr = process.communicate()

    # 输出结果
    print("输出:", stdout.decode("utf-8"))
    print("错误:", stderr.decode("utf-8"))

# 随机抽取50张表,统计行数,并写到文件中
mydb = mysql.connector.connect(
    host="127.0.0.1",
    port=3316,
    user="root",
    passwd="123456",
    database="information_schema",
    autocommit=True,
)
mycursor = mydb.cursor()

mycursor.execute(
    """select concat('select count(*) from ','`', TABLE_SCHEMA,'`.`',TABLE_NAME,'`', ' where 1=1;' ) as count_sql from information_schema.tables where TABLE_SCHEMA not in ('sys','test','mysql','information_schema','performance_schema') order by rand() limit 50;"""
)

res4 = mycursor.fetchall()

res_all = dict()
for sql in res4:
    mycursor.execute(sql[0])
    res5 = mycursor.fetchone()
    res_all[sql[0]] = res5[0]

res_all = (
    '{"instance":"'
    + str(random_instance)
    + '","detail":['
    + json.dumps(res_all)
    + "]}"
)

# 结果写到以实例命名的log文件中
with open("/tmp/res.log", "w") as f:
    f.write(res_all)

# 日志
with open("/tmp/recovery_test.log", "a+") as f:
    f.writelines(
        "脚本执行的日期:"
        + str(datetime.now().date())
        + ",抽取的实例:"
        + random_instance
        + ", 抽取的备份文件的日期名:"
        + day_path
        + "\n"
    )

# 最后,清理下临时文件,避免太多磁盘占用
files = glob.glob(os.path.join(target_dir, "*"))
for file in files:
    if os.path.isfile(file):
        os.remove(file)
    elif os.path.isdir(file):
        shutil.rmtree(file)

2、对step1的sql发到生产去执行,并比对结果的差

代码语言:python
复制
这里就不贴代码了,其实就是解析step1的json结果,然后去生产count(*)查询数据而已。
比对完成后,还需要把结果记录到数据库中,然后发个总结邮件出来。
最后,在验证无误后,还要把临时MySQL里面的数据清掉,防止遗忘造成数据泄露。

大体就是这个流程了。代码写的比较拉,这里就不贴了。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、从备份集中随机挑选一个实例,然后随机挑选一个日期
  • 2、对step1的sql发到生产去执行,并比对结果的差
相关产品与服务
TDSQL MySQL 版
TDSQL MySQL 版(TDSQL for MySQL)是腾讯打造的一款分布式数据库产品,具备强一致高可用、全球部署架构、分布式水平扩展、高性能、企业级安全等特性,同时提供智能 DBA、自动化运营、监控告警等配套设施,为客户提供完整的分布式数据库解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档