前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MYSQL冷备份数据上传到对象存储

MYSQL冷备份数据上传到对象存储

作者头像
以谁为师
发布2023-12-23 08:20:46
1660
发布2023-12-23 08:20:46
举报

介绍

      将MySQL数据库中的冷数据备份并上传至云平台对象存储的过程。冷数据是指数据库中的历史或不经常访问的数据。我们首先通过执行SQL查询语句从MySQL数据库中提取所需数据,然后将其保存为CSV文件格式,接着通过SDK将备份文件上传到对象存储。

查询

代码语言:javascript
复制
-- 查询最早的数据
SELECT *
FROM bos_balance_flow limit 5

-- 查询最后10条数据
SELECT *
FROM bos_balance_flow
ORDER BY created_at DESC
LIMIT 10;

添加索引

给时间字段加上索引提高查询速度

代码语言:javascript
复制
-- 给订单归档表加时间字段加索引
CREATE INDEX idx_created_at ON bos_order_archive(created_at);

历史数据上传s3

代码语言:javascript
复制
# 后台执行数据备份脚本
nohup python3 db-upload-mongo-s3.py &
代码语言:javascript
复制
# 一次性上传历史mysql数据到s3

import logging
from logging.handlers import RotatingFileHandler
import os
from datetime import datetime, timedelta
import pandas as pd
import pymysql
import boto3
import time

# 配置日志
logger = logging.getLogger('sql_logger')
logger.setLevel(logging.INFO)

# 配置RotatingFileHandler,设置日志文件路径、文件大小和文件数量
handler = RotatingFileHandler('/tmp/sql.log', maxBytes=50 * 1024 * 1024, backupCount=2)

# 设置日志格式
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

# 添加handler到logger对象
logger.addHandler(handler)

# 添加日志输出到终端
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)

# AWS S3 配置
AWS_ACCESS_KEY = "AKIASEGBIssss"
AWS_SECRET_KEY = "xxxxxxxx"
S3_BUCKET_NAME = "pro-s3-db"
S3_DIRECTORY = "mongo_bos_server"

# 数据库连接配置
DB_HOST = "172.16.99.99"
DB_USER = "root"
DB_PASSWORD = "xxxxxxxx"
DB_NAME = "bos_server"

# 日期格式
DATE_FORMAT = "%Y-%m-%d"

# 获取每个表的最早和最后日期
table_dates = {}

with pymysql.connect(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME) as connection:
    
    # 将需要处理的表添加到列表中
    for table in ["bos_order_archive", "bos_order_future"]:
        # 查询每个表中最早和最后的日期
        min_date_query = f"SELECT MIN(created_at) FROM {table}"
        max_date_query = f"SELECT MAX(created_at) FROM {table}"

        min_date_result = pd.read_sql_query(min_date_query, connection)
        max_date_result = pd.read_sql_query(max_date_query, connection)

        # 转换日期格式
        min_date = min_date_result.iloc[0, 0].strftime(DATE_FORMAT)
        max_date = max_date_result.iloc[0, 0].strftime(DATE_FORMAT)

        table_dates[table] = {"min_date": min_date, "max_date": max_date}

# 遍历每个表的日期范围
for table, dates in table_dates.items():
    current_date = datetime.strptime(dates["min_date"], DATE_FORMAT)
    end_date = datetime.strptime(dates["max_date"], DATE_FORMAT)

    while current_date <= end_date:
        current_date_str = current_date.strftime(DATE_FORMAT)

        # 构建 SQL 查询语句
        sql_query = f"SELECT * FROM {table} WHERE created_at >= '{current_date_str} 00:00:00.000' AND created_at < '{current_date_str} 23:59:59.999'"

        # 使用 with 语句连接数据库
        with pymysql.connect(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME) as connection:
            # 使用 pandas 读取数据库数据
            df = pd.read_sql_query(sql_query, connection)

        # 如果数据不为空则上传到S3
        if not df.empty:
            # 生成 CSV 文件名
            csv_filename = f"{table}_{current_date_str}.csv"

            # 保存数据到 CSV 文件
            df.to_csv(csv_filename, index=False)

            # 获取文件大小
            file_size = os.path.getsize(csv_filename)
            file_size_mb = file_size / (1024 * 1024)  # Convert bytes to megabytes

            # 构建S3目标路径
            s3_object_key = f"{S3_DIRECTORY}/{csv_filename}"

            # 使用 boto3 上传文件至 S3
            s3_client = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY)
            s3_client.upload_file(csv_filename, S3_BUCKET_NAME, s3_object_key)

            # 记录日志
            logger.info(f"文件 {csv_filename} 已上传到 S3 存储桶 {S3_BUCKET_NAME} 目录 {S3_DIRECTORY},文件大小: {file_size_mb:.2f} MB,上传成功")

            # 等待5秒
            time.sleep(5)

        # 增加一天
        current_date += timedelta(days=1)

每日数据上传s3

      从一个数据库中获取前一天的数据。将数据存储到一个 CSV 文件中。 检查本地是否已存在该 CSV 文件,如果存在则不执行数据库查询,直接将已有文件上传到 Amazon S3 存储桶中。

代码语言:javascript
复制
import os
import pandas as pd
import boto3
from datetime import datetime, timedelta
import requests
import pymysql



def send_notification(title, content):
    message = {
        "msg_type": "text",
        "content": {
            "text": f"{title}\n{content}"
        }
    }
    lark_headers = {"Content-Type": "application/json"}
    webhook_url = "https://open.larksuite.com/open-apis/bot/v2/hook/xxxxxxxxxxxxxxx"
    response = requests.post(webhook_url, json=message, headers=lark_headers)
    if response.status_code == 200:
        print("Notification sent successfully.")
    else:
        print(f"Failed to send notification. Status code: {response.status_code}, Response text: {response.text}")
    #print(response.text)



class S3Uploader:
    def __init__(self, aws_access_key, aws_secret_key, s3_bucket_name):
        self.s3_client = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key)
        self.s3_bucket_name = s3_bucket_name

    def file_exists_in_s3(self, s3_object_key):
        try:
            self.s3_client.head_object(Bucket=self.s3_bucket_name, Key=s3_object_key)
            return True
        except:
            return False

    def upload_to_s3(self, local_file_path, s3_object_key):
        self.s3_client.upload_file(local_file_path, self.s3_bucket_name, s3_object_key)
        print(f"File {local_file_path} uploaded to S3 bucket {self.s3_bucket_name} with key {s3_object_key}")

def main():
    # 日期格式
    DATE_FORMAT = "%Y-%m-%d"

    # AWS S3 配置
    AWS_ACCESS_KEY = "AKIASEGBxxxxx"
    AWS_SECRET_KEY = "Myk97UInMQXXXX"
    S3_BUCKET_NAME = "pro-s3-db"

    # 数据库连接配置
    DB_HOST = "pro-rds-xxxx-slave.xxxx.com"
    DB_USER = "dwh_XXXX"
    DB_PASSWORD = "XXXXXXX"
    DB_NAME = "bos_x"


    # 凌晨5点触发脚本,计算前一天的日期
    yesterday = datetime.now() - timedelta(days=1)
    yesterday_str = yesterday.strftime(DATE_FORMAT)

    # 构建当月目录
    current_month_directory = yesterday.strftime("%Y.%m")

    # 生成 CSV 文件名,包含当月目录
    #csv_filename = f"dwh_balance_flow_infos_{yesterday_str}.csv"
    csv_filename = f"bos_balance_flow_{yesterday_str}.csv"

    # 检查本地文件是否已存在
    if os.path.exists(csv_filename):
        print(f"File {csv_filename} already exists locally. Skipping database query")
        # 初始化 S3 上传器
        s3_uploader = S3Uploader(AWS_ACCESS_KEY, AWS_SECRET_KEY, S3_BUCKET_NAME)

        # 构建 S3 对象键
        s3_object_key = f"{current_month_directory}/{csv_filename}"

        # 检查文件是否已存在于 S3 中
        if s3_uploader.file_exists_in_s3(s3_object_key):
            print(f"File {csv_filename} already exists in S3. Skipping upload.")

            # 发送通知到Lark
            file_size = os.path.getsize(csv_filename)
            file_size_mb = file_size / (1024 * 1024)  # Convert bytes to megabytes
            message = f"S3路径:/{s3_object_key} {file_size_mb:.2f} MB 检测已存在"
            send_notification("只读库bos_balance_flow备份成功:", message)
        else:
            # 上传文件至 S3
            s3_uploader.upload_to_s3(csv_filename, s3_object_key)

    else:
        # 使用 with 语句连接数据库
        with pymysql.connect(host=DB_HOST, user=DB_USER, password=DB_PASSWORD, database=DB_NAME) as connection:
            # 使用 pandas 读取数据库数据
            sql_query = f"SELECT * FROM bos_balance_flow  WHERE created_at >= '{yesterday_str} 00:00:00.000' AND created_at < '{yesterday_str} 23:59:59.999'"
            #sql_query = f"SELECT * FROM dwh_balance_flow_infos WHERE created_at >= '{yesterday_str} 00:00:00.000' AND created_at < '{yesterday_str} 23:59:59.999'"
            df = pd.read_sql_query(sql_query, connection)

        # 保存数据到 CSV 文件
        df.to_csv(csv_filename, index=False)

        # 初始化 S3 上传器
        s3_uploader = S3Uploader(AWS_ACCESS_KEY, AWS_SECRET_KEY, S3_BUCKET_NAME)

        # 构建 S3 对象键
        s3_object_key = f"{current_month_directory}/{csv_filename}"

        # 检查文件是否已存在于 S3 中
        if s3_uploader.file_exists_in_s3(s3_object_key):
            print(f"File {csv_filename} already exists in S3. Skipping upload !!")

            # 发送通知到Lark
            file_size = os.path.getsize(csv_filename)
            file_size_mb = file_size / (1024 * 1024)  # Convert bytes to megabytes
            message = f"S3路径:/{s3_object_key} {file_size_mb:.2f} MB 检测已存在"
            send_notification("只读库bos_balance_flow备份成功:", message)

        else:
            # 上传文件至 S3
            s3_uploader.upload_to_s3(csv_filename, s3_object_key)


            # 发送通知到Lark
            file_size = os.path.getsize(csv_filename)
            file_size_mb = file_size / (1024 * 1024)  # Convert bytes to megabytes
            message = f"S3路径:/{s3_object_key} {file_size_mb:.2f} MB"
            send_notification("只读库bos_balance_flow备份成功:", message)


if __name__ == "__main__":
    main()
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-12-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • 查询
  • 添加索引
  • 历史数据上传s3
  • 每日数据上传s3
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档