前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于python的mysql复制工具

基于python的mysql复制工具

作者头像
用户1278550
发布2019-07-01 17:03:03
2.5K0
发布2019-07-01 17:03:03
举报
文章被收录于专栏:idba

一 简介

python-mysql-replication 是基于python实现的 MySQL复制协议工具,我们可以用它来解析binlog 获取日志的insert,update,delete等事件 ,并基于此做其他业务需求。比如数据更改时失效缓存,监听dml事件通知下游业务方做对应处理。

其项目信息

代码语言:javascript
复制
网址     http://www.github.com/noplay/python-mysql-replication官方文档  https://python-mysql-replication.readthedocs.io

二 实践

2.1 安装配置

获取源代码

git clone http://www.github.com/noplay/python-mysql-replication

使用pip 安装

pip install mysql-replication

权限: 可以直接使用复制账号也可以使用其他账号,但是该账号必须SELECT, REPLICATION SLAVE, REPLICATION CLIENT权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'replicator'@'%' IDENTIFIED BY 'xxxxx';

数据库日志相关的参数设置如下:

log_bin=on ,binlog_format=row,binlog_row_image=FULL

2.2 核心类介绍

python-mysql-replication 的入口是类BinLogStreamReader(),我们在使用该工具时需要实例化一个BinLogStreamReader()对象 stream,BinLogStreamReader 通过 ReportSlave 向主库注册作为一个slave角色,用于接受MySQL的binlog广播。有兴趣的可以研究其代码具体实现。

该实例提供解析 binlog 各种事件的集合,每个事件也是一个对象。

初始化BinLogStreamReader()实例需要使用的参数如下:

代码语言:javascript
复制
connection_settings: 数据库的连接配置信息resume_stream:从位置或binlog的最新事件或旧的可用事件开始log_file:设置复制开始日志文件log_pos:设置复制开始日志pos(resume_stream应该为true)auto_position:使用master_auto_position gtid设置位置blocking:如果设置为True,会持续监听binlog事件,如果设置为False 则会一次性解析所有可获取的binlog。only_events:只解析指定的事件 比如only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],参数类型是一个数组。
#### 以上是比较常用的参数
ignored_events:设置哪些事件可以被忽略。也是一个数组。
only_tables,ignored_tables,only_schemas,ignored_schemas ##根据字面意思理解
freeze_schema:如果为true,则不支持ALTER TABLE速度更快。skip_to_timestamp:在达到指定的时间戳之前忽略所有事件,否则会解析所有可访问的binlogreport_slave:用于向主库注册SHOW SLAVE HOSTS中slave,该值可以是字典比如{'hostname':'127.0.0.1', 'username':'root', 'password':'rep', 'port':3306}
slave_uuid:在SHOW SLAVE HOSTS中slave_uuid。fail_on_table_metadata_unavailable:如果我们无法获取有关row_events的表信息,应该引发异常。
2.3 如何使用呢?

最简单的用法 脚本名 pyreplica.py

代码语言:javascript
复制
from pymysqlreplication import BinLogStreamReaderMYSQL_SETTINGS = {    "host": "127.0.0.1",    "port": 3306,    "user": "root",    "passwd": ""}
def main():    # server_id is your slave identifier, it should be unique.    # set blocking to True if you want to block and wait for the next event at    # the end of the stream    stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,                                server_id=3,                                blocking=True)
    for binlogevent in stream:        binlogevent.dump()    stream.close() ###如果blocking=True ,改行记录可以不用。if __name__ == "__main__":    main()

开启两个窗口,一个窗口执行,另外一个窗口操作mysql 写入或者修改数据

python pyreplica.py

输出如下:

代码语言:javascript
复制
=== GtidEvent ===Date: 2019-06-25T17:41:34Log position: 339Event size: 42Read bytes: 25Commit: FalseGTID_NEXT: cc726403-93d1-11e9-90b7-ecf4bbde7778:13()=== QueryEvent ===Date: 2019-06-25T17:41:34Log position: 411Event size: 49Read bytes: 49Schema: testExecution time: 0Query: BEGIN()=== TableMapEvent ===Date: 2019-06-25T17:41:34Log position: 456Event size: 22Read bytes: 21Table id: 126Schema: testTable: xColumns: 2()=== WriteRowsEvent ===Date: 2019-06-25T17:41:34Log position: 500Event size: 21Read bytes: 12Table: test.xAffected columns: 2Changed rows: 1Values:--('*', u'a', ':', 1)('*', u'id', ':', 18)()=== XidEvent ===Date: 2019-06-25T17:41:34Log position: 531Event size: 8Read bytes: 8Transaction ID: 1293393()
2.3 拓展

基于该工具提供的日志事件解析我们可以做很多事情,比较有名的工具 binlog2sql 利用该工具解析binlog 做数据回滚 。

mysql-replication.py

代码语言:javascript
复制
#!/usr/bin/env python# -*- coding: utf-8 -*-
from pymysqlreplication import BinLogStreamReaderfrom pymysqlreplication.row_event import (    DeleteRowsEvent,    UpdateRowsEvent,    WriteRowsEvent,)import sysimport json
mysql_settings = {'host': '127.0.0.1','port': 3306,                   'user': 'replica', 'passwd': 'xxxx'}def main():
    stream = BinLogStreamReader(        connection_settings=mysql_settings,        server_id=1,        blocking=True,        only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
    for binlogevent in stream:        for row in binlogevent.rows:            event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos}            if isinstance(binlogevent, DeleteRowsEvent):                event["action"] = "delete"                event["values"] = dict(row["values"].items())                event = dict(event.items())            elif isinstance(binlogevent, UpdateRowsEvent):                event["action"] = "update"                event["before_values"] = dict(row["before_values"].items())                event["after_values"] = dict(row["after_values"].items())                event = dict(event.items())            elif isinstance(binlogevent, WriteRowsEvent):                event["action"] = "insert"                event["values"] = dict(row["values"].items())                event = dict(event.items())            print json.dumps(event)            sys.stdout.flush()
if __name__ == "__main__":    main()

执行脚本结果 如下图

除了解析binlog,我们还可以用python-mysql-replication 做数据全量加增量迁移。比如仅仅迁移某些大表而不是整个库的时候,可以用到。有兴趣的朋友可以想想大概的算法。

-The End-

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-06-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 yangyidba 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一 简介
  • 二 实践
    • 2.1 安装配置
      • 2.2 核心类介绍
        • 2.3 如何使用呢?
          • 2.3 拓展
          相关产品与服务
          访问管理
          访问管理(Cloud Access Management,CAM)可以帮助您安全、便捷地管理对腾讯云服务和资源的访问。您可以使用CAM创建子用户、用户组和角色,并通过策略控制其访问范围。CAM支持用户和角色SSO能力,您可以根据具体管理场景针对性设置企业内用户和腾讯云的互通能力。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档