前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过Python实现MySQL和PG数据比对

通过Python实现MySQL和PG数据比对

作者头像
保持热爱奔赴山海
发布2022-11-28 15:42:39
6890
发布2022-11-28 15:42:39
举报
文章被收录于专栏:饮水机管理员饮水机管理员

生产上,有个需要从MySQL异构复制数据到PG中的需求。 数据同步组件用的是go-mysql-postgres (两位前同事基于社区开源的go-mysql-elasticsearch上做的PG功能补丁)。

目前测试环境异构数据复制已经跑起来了,但是还需要做下二者间的数据校验。 简单写了个python脚本,如下:

run.py  内容如下

代码语言:javascript
复制
# pip3 install psycopg2==2.9.4
# pip3 install mysql-connector-python==8.0.31

import mysql.connector
import psycopg2
import time
import configs
import hashlib

# TODO 待设计
# column_list = ["id","name","ctime","utime"]  # 需要校验的列。 当前的设计方法是直接select * ,这需要确保2端数据源的列名一致


start_time = time.time()

mydb = mysql.connector.connect(
    host=configs.mysql_host,
    port=configs.mysql_port,
    user=configs.mysql_user,
    passwd=configs.mysql_pass,
)

mysql_cursor = mydb.cursor()

# 获取当前最小 最大的id,用于后续的循环比对
get_min_max_sql = 'SELECT min(id),max(id),count(*) from ' + configs.mysql_db + '.' + configs.mysql_tb + ';'
mysql_cursor.execute(get_min_max_sql)
pk_range_result = mysql_cursor.fetchall()

for x in pk_range_result:
    min_id = x[0]
    max_id = x[1]
    count = x[2]

print(f"{configs.mysql_db}.{configs.mysql_tb} 最小id {min_id} , 最大id {max_id} , 总记录数 {count}")

start_id = min_id
stop_id = start_id + configs.step

# 记录差异行数量
diff_count = 0

while stop_id < max_id + configs.step:  # 加一个步长进去,防止因为step过大,导致有遗漏的id
    # 拼接出比对的SQL
    chksum_sql_4mysql = 'SELECT * FROM ' + configs.mysql_db + '.' + configs.mysql_tb + ' WHERE id >=' + str(
        start_id) + ' AND id < ' + str(stop_id) + ' ORDER BY id ASC;'

    chksum_sql_4pg = 'SELECT * FROM ' + configs.pg_schema + '.' + configs.pg_tb + ' WHERE id >=' + str(
        start_id) + ' AND id < ' + str(stop_id) + ' ORDER BY id ASC;'

    mysql_cursor.execute(chksum_sql_4mysql)
    mysql_chksum_result = mysql_cursor.fetchall()

    mysql_chksum = dict()
    for x in mysql_chksum_result:
        id = x[0]
        chk_sum = hashlib.md5(str(x).replace(' ', '').encode()).hexdigest()
        mysql_chksum[id] = chk_sum
    # print(f"MySQL校验和 {mysql_chksum}")

    # 连接PG进行数据校验
    pg_conn = psycopg2.connect(host=configs.pg_host,
                               port=configs.pg_port,
                               user=configs.pg_user,
                               password=configs.pg_pass,
                               database=configs.pg_db
                               )
    pg_cursor = pg_conn.cursor()
    pg_cursor.execute(chksum_sql_4pg)

    pg_chksum_result = pg_cursor.fetchall()

    pg_chksum = dict()
    for x in pg_chksum_result:
        id = x[0]
        chk_sum = hashlib.md5(str(x).replace(' ', '').encode()).hexdigest()
        pg_chksum[id] = chk_sum
    # print(f"PG校验和 {pg_chksum}")

    # 通过集合的比较,快速找出不一致的主键id
    if mysql_chksum != pg_chksum:
        differ = set(mysql_chksum.items()) ^ set(pg_chksum.items())
        s1 = set()
        for i in differ:
            s1.add(i[0])
        print(s1)
        with open('checksum_diff.log', 'a+') as f:
            f.write(str(s1, ) + '\n')

        diff_count = diff_count + len(s1)

    start_id = stop_id
    stop_id = stop_id + configs.step

stop_time = time.time()
time_dur = stop_time - start_time
print(f"比对 {count}条记录,总差异条数 {diff_count},耗时 {time_dur} 秒")

configs.py  内容如下

代码语言:javascript
复制
# MySQL数据源的信息
mysql_host = '192.168.31.181'
mysql_port = '3306'
mysql_user = 'dts'
mysql_pass = 'dts'
mysql_db = 'sbtest'
mysql_tb = 'sbtest1'

# PostgreSQL目标库的信息
pg_host = '192.168.31.182'
pg_port = '5432'
pg_user = 'dts'
pg_pass = 'dts'
pg_db = 'sbtest'
pg_schema = 'public'
pg_tb = 'sbtest1'


# 每次遍历的记录数
step = 2000  # 步长。 1表示逐条检测。这个值建议在1000-2000之间。

运行效果

通过Python实现MySQL和PG数据比对_postgresql
通过Python实现MySQL和PG数据比对_postgresql

走公网流量情况下,9k记录,在不同step下的耗时比对:

代码语言:javascript
复制
step = 100   18.5s
step = 500   5s
step = 1000  3.7s
step = 2000  3.3s
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-11-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档