前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用go-mysql-postgresql实现 MySQL实时同步数据到PG

使用go-mysql-postgresql实现 MySQL实时同步数据到PG

作者头像
保持热爱奔赴山海
修改2024-01-20 08:12:18
4.3K12
修改2024-01-20 08:12:18
举报
文章被收录于专栏:饮水机管理员饮水机管理员

MySQL to PG 的数据同步,可以通过canal 或者 bireme 来做,但是操作起来步骤都比较费事。

之前公司的同事,在go-mysql-elasticsearch的基础上,改了一下,将target从es改为了pg,工具名称叫做go-mysql-postgresql 。这个工具最大的好处就是一键部署使用,不依赖其它组件。

项目地址:https://github.com/frainmeng/go-mysql-elasticsearch

推荐使用这个版本:https://gitee.com/tangjunhu/go-mysql-postgres 【在上面的代码基础上,增加了MySQL分区表联合主键的等功能的支持】

下面是我的配置操作笔记:

1、 在源MySQL上开设同步专用的账号

代码语言:javascript
复制
grant replication slave, replication client,process ,select on *.* to dts@'%' identified by 'dts';

MySQL上面的表情况:
use testdb;
testdb >show create table t_order \G
*************************** 1. row ***************************
       Table: t_order
Create Table: CREATE TABLE `t_order` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `aid` int(10) unsigned NOT NULL,
  `uid` int(10) unsigned NOT NULL,
  `type` tinyint(3) unsigned NOT NULL,
  `status` tinyint(4) unsigned NOT NULL,
  `price` int(10) unsigned NOT NULL COMMENT '',
  `num` int(10) unsigned NOT NULL,
  `city` varchar(64) NOT NULL,
  `category` varchar(64) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `uid` (`uid`)
) ENGINE=InnoDB AUTO_INCREMENT=1000 DEFAULT CHARSET=utf8 ROW_FORMAT=COMPRESSED COMMENT=''
1 row in set (0.00 sec)

2、在PG上创建相同的表

代码语言:javascript
复制
create database testdb ;

\c testdb 

CREATE TABLE t_order (
  id bigint  NOT NULL,
  aid bigint  NOT NULL,
  uid bigint  NOT NULL,
  type bigint  NOT NULL,
  status bigint  NOT NULL,
  price bigint  NOT NULL ,
  num bigint  NOT NULL,
  city varchar(64) NOT NULL,
  category varchar(64) NOT NULL,
  PRIMARY KEY (id)
) ;

CREATE USER dts REPLICATION LOGIN CONNECTION LIMIT 10 ENCRYPTED PASSWORD 'dts'; 
grant connect on database testdb to dts;
grant usage on schema public to dts;
grant select on all tables in schema public to dts;  
grant all on table t_order to dts;

go-mysql-postgresql 的部署:

将文件解压到 /var/lib/pgsql/go-mysql-postgresql 目录里面。

vim /var/lib/pgsql/go-mysql-postgresql/master.info  将准备同步的binlog信息写入文件中

bin_name = "mysql-bin.000167"

bin_pos = 13389413

cat /var/lib/pgsql/go-mysql-postgresql/river.toml

代码语言:javascript
复制
# 源端MySQL连接配置
my_addr = "172.31.10.100:3306"
my_user = "dts"
my_pass = "dts"
my_charset = "utf8"

# 目的端PG连接配置
pg_host = "192.168.2.4"
pg_port = 5434
pg_user = "dts"
pg_pass = "dts"
pg_dbname = "testdb"

# 存放同步到的位移点的文件目录
data_dir = "./var"
# Inner Http status address
stat_addr = "192.168.2.4:12800"

# statsd monitor
statsd_host = "127.0.0.1"
statsd_port = 8125
statsd_prefix = "dbsync"

# 伪装成slave时候,配置的server-id
server_id = 1001
flavor = "mysql"

# minimal items to be inserted in one bulk
bulk_size = 1

# force flush the pending requests if we don't have enough items >= bulk_size
flush_bulk_time = "500ms"

# Ignore table without primary key
skip_no_pk_table = false
# concurrency conf
concurrent_size = 6
concurrent_ack_win = 2048

# MySQL data source
[[source]]
schema = "testdb"
tables = ["t_order"]

# 目标PG的连接配置
[[target]]
pg_name = "172.31.10.100_testdb_t_order"
pg_host = "192.168.2.4"
pg_port = 5434
pg_user = "dts"
pg_pass = "dts"
pg_dbname = "testdb"

# MySQL 数据到 PG 后的分发规则
[[rule]]
#mysql 库表的配置
schema = "testdb"
table = "t_order"
# pg 库表的配置
pg_schema = "public"
pg_table = "t_order"
# 下面这行很重要,标识了rule和target的绑定关系
pg_name = "172.31.10.100_testdb_t_order"

启动:

sh start.sh 即可

日志大致类似这样的:

2019/08/21 13:02:36 pgclient.go:199 pg delete event execute success! Schemapublic Tablet_order, Id166773984,result{0xc000182b00 1},reqId503

测试:

代码语言:javascript
复制
5k条记录, 走专线  从传输到写入到pg 用了33s
2019-08-20 23:33:29.289 CST [112184] LOG:  duration: 0.321 ms
2019-08-20 23:34:02.769 CST [112184] LOG:  duration: 0.085 ms


2w记录, 走专线  从传输到写入到pg 用了 140s
2019-08-20 23:35:20.216 CST [112189] LOG:  duration: 0.347 ms
2019-08-20 23:37:39.848 CST [85173] LOG:  duration: 6.648 ms

最后补充:

我们在做异构数据同步的时候,使用go-mysql-postgresql之前,通常情况下还需要将mysql老的数据全量同步过来,然后才能使用 go-mysql-postgresql来消费binlog达到同步数据的目的。 全量同步数据的方法,可以参考上一篇blog,地址: https://cloud.tencent.com/developer/article/1506977

20240119 补充:MySQL到PG的DDL工单自动化的逻辑

代码语言:python
复制
下面贴下核心的处理逻辑:
从工单平台查询最近5分钟内的工单,如果是异构复制的表的DDL,
则将DDL语法转为PG的DDL语法,并下发到PG去执行;执行成功或失败,都触发钉钉通知。

# 这里只贴了核心处理代码, 这里的i[3]是MySQL上的DDL工单明细
sql = (
	str(i[3])
	.replace("`", "")
	.replace("fast_insurance.", "")
	.replace(" datetime ", " timestamp ")
	.replace(" unsigned ", "  ")
)

pattern_1 = r"COMMENT\s+'.*?'"
new_sql = re.sub(pattern_1, "", sql)

pattern_2 = r"comment\s+'.*?'"
new_new_sql = re.sub(pattern_2, "", new_sql)

sql_list = new_new_sql.replace("\r\n", "").split(";")
print("拆分后的SQL --> ", sql_list)

"""
不足:
目前还不支持的类似下面的 MODIFY语法:
ALTER TABLE t2 
MODIFY ext2 varchar(256) DEFAULT NULL ,
MODIFY ext6 varchar(256) DEFAULT NULL ;
"""

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-09-06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档