前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[MYSQL] mysql怎样单表导入? && 从binlog提取指定表

[MYSQL] mysql怎样单表导入? && 从binlog提取指定表

原创
作者头像
大大刺猬
发布2024-03-01 09:15:16
1580
发布2024-03-01 09:15:16
举报
文章被收录于专栏:大大刺猬大大刺猬

导读

上一篇文章介绍了 并发导入, 但还有一种需求是 只恢复特定的某张表. 比如这张表有坏块啊, 或者其它啥需求, 反正就是要恢复这张表, 但是又没单独备份这张表.

只有 一个定时全备(mysqldump)

那要怎么恢复呢???

分析

上一篇介绍的 mysqldump拆分脚本 还支持 仅拆分出来指定的表, 然后我们再从binlog中解析出指定的表做恢复即可.

也就是说现在 只要从binlog中提取指定的表即可. 但客户环境可能不允许使用 binlog2sql或者my2sql等工具.....

没事, binlog文件结构不复杂, 自己写个简单脚本提取指定的Binlog即可. 好在之前有解析过binlog文件: https://cloud.tencent.com/developer/article/2237558

binlog文件提取指定的表

我们指定binlog是由 很多event组成的, 而一个事务由 GTID_EVENT 和众多 ROW_EVENT 再加上XID_EVENT 组成, 而ROW_EVENT 又细分为 DELETE_EVENT, WRITE_EVENT, UPDATE_EVENT. 每个ROW_EVENT还有个TABLE_MAP_EVENT来记录元数据信息, 比如字段类型, 表名等信息 (不包含字段名字)

所以我们只需要解析出这几个EVENT即可. 再加上第一个EVENT描述信息即可.

每个EVENT都由 HEADERPAYLOAD 组成. HEADER结构如下:

对象

大小(bytes)

描述

timestamp

4

event生成时间

event_type

1

event类型

server_id

4

产生这个event的server_id

event_size

4

event大小(含header的19bytes)

log_pos

4

event的结束位置(pos)

flags

2

flags

PAYLOAD的话, 每个EVENT都不一样, 这里只看 TABLE_MAP_EVENT的一部分, 因为本次解析binlog只涉及到这一丢丢

名字

大小(字节)

描述

table_id

int<6>

表打开的id, 不是数据库里面的table_id

flags

2

保留字段

database_name_length

可变长度

数据库名长度

database_name

取决于database_name_length

数据库名(以0x00结尾, 这个字节不计算在database_name_length中)

table_name_length

可变长度

表名长度

table_name

取决于table_name_length

表名(以额外的0x00结尾, 就是不在table_name_length的计算中)

column_count

可变长度

多少个字段

column_type_list

取决于column_count

list类型, 每个字段的数据类型,用1字节表示(比如3表示int<4> 详情)

.....

.....

暂时用不上其它的

脚本编写思路

简单说一下脚本编写思路, 毕竟每个人的思路都不一样

代码语言:note
复制
读取BINLOG文件
如果是 table_map_event
	就解析payload, 得到库表名字, 并保存下来
		如果 匹配表名成功 则记录该EVENT
如果是 row_event
	则匹配刚才的库表信息, 成功则记录下来

如果是GTID_EVENT
	直接记录下来, 这是事务的开头

如果是 XID_EVENT
	记录下来
	判断 记录的event队列长度, 大于等于4 则表示 又成功匹配上的表 则写入新文件
	清空event队列
其它EVENT 则跳过(seek)

思路还是比较简单的. 就是匹配.

测试

从mysqldump中拆分出指定的表

使用--database--table 匹配需要的表名信息

代码语言:shell
复制
python MysqlDumpSplitSQL.py t20240228_alldb.sql --database ibd2sql --table ddcw_alltype_table

然后导入数据库

代码语言:shell
复制
mysql -h127.0.0.1 -P3314 -p123456 < /root/mysqldump_t20240226/splitByddcw_20240301_084906/dbs/ibd2sql/ddcw_alltype_table.sql

从Binlog提取指定的表

用法和上一个脚本一样

使用--database--table 匹配需要的表名信息

代码语言:shell
复制
python binlogFtable.py /data/mysql_3314/mysqllog/binlog/m3314.000002 --database ibd2sql --table ddcw_alltype_table

然后导入数据库, 由于带有gtid, 故直接导入数据库会失败, 我们需要使用--skip-gtids来忽略gtid信息

代码语言:shell
复制
mysqlbinlog --skip-gtids /root/mysqldump_t20240226/BinlogFtableByddcw_20240301_085208/m3314.000002 | mysql -h127.0.0.1 -P3314 -p123456

验证

发现我们解析出来的哪张表数据确实是执行binlog之后的了, 证明这个方法是可行的. (我这里只有一个delete操作, 是为了方面演示, 实际环境可能是一大堆DML操作)

总结

本次 通过拆分 mysqldump导出的数据, 然后提取binlog指定的表, 最后使用mysqlbinlog来解析binlog并导入数据库 来实现单表恢复. 看起来流程还是很麻烦的. 但原理还是简单, 就是匹配指定的表, 然后重新回放.

当然如果又备库的话, 直接从备库导出更方便.

附脚本

mysqldump拆分脚本

binlog提取指定表脚本如下:

代码语言:python
复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# write by ddcw @https://github.com/ddcw
# 从binlog里面过滤出指定的表信息. 会破坏事务的完整性, 谨慎使用!!!

import sys,os
import re
import datetime,time
import errno
import argparse,glob
import struct

def _argparse():
	parser = argparse.ArgumentParser(add_help=True, description='从 binlog 提取出指定表出来')
	parser.add_argument('--version', '-v', '-V', action='store_true', dest="VERSION", default=False, help='版本信息')
	parser.add_argument('--database', dest="DATABASE", default="*", help='数据库')
	parser.add_argument('--table', dest="TABLE", default="*", help='表')
	parser.add_argument('--output-dir', dest="OUTPUT_DIR", default="", help='输出的目录')
	parser.add_argument("files", nargs="*", help="要提取的原始BINLOG文件")
	if parser.parse_args().VERSION:
		print('VERSION: v0.1')
		sys.exit(0)
	return parser.parse_args()

def mkdir_exists(dirname):
	try:
		os.makedirs(dirname)
	except OSError as e:
		if e.errno != errno.EEXIST:
			raise

def match_table(dbname,tablename):
	dtname = str(dbname) + "." + str(tablename)
	return True if re.search(PATTERN,dtname) else False

#把event写入binlog文件, 主要是修改POS信息
def write_event(event_header,event_payload,f):
	timestamp, event_type, server_id, event_size, log_pos, flags = struct.unpack("<LBLLLh",event_header[0:19])
	event_size = len(event_header) + len(event_payload)
	log_pos = f.tell() + event_size
	event_header = struct.pack("<LBLLLh", timestamp, event_type, server_id, event_size, log_pos, flags)
	f.write(event_header)
	f.write(event_payload)

def read_tablename_dbname(bdata):
	offset = 8 #table_id + flag
	dbname_length = struct.unpack('<B', bdata[offset:offset+1])[0]
	offset += 1
	dbname =  bdata[offset:offset+dbname_length].decode()
	offset += dbname_length + 1 #\x00结尾
	tablename_length = struct.unpack('<B', bdata[offset:offset+1])[0]
	offset += 1
	tablename =  bdata[offset:offset+tablename_length].decode()
	return dbname, tablename

def event_header(bdata):
	return 
	timestamp, event_type, server_id, event_size, log_pos, flags = struct.unpack("<LBLLLh",bdata[0:19])

if __name__ == '__main__':
	parser = _argparse()
	FILTER_DATABASE = parser.DATABASE
	FILTER_TABLE = parser.TABLE
	PATTERN = str(FILTER_DATABASE).replace("*",".*") + "\." + str(FILTER_TABLE).replace("*",".*")
	filelist = []
	for pattern in parser.files:
		filelist += glob.glob(pattern)
	filelist.sort() #其实没必要排序的
	OUTPUT_DIR = parser.OUTPUT_DIR if parser.OUTPUT_DIR != "" else "BinlogFtableByddcw_" + str(datetime.datetime.now().strftime("%Y%m%d_%H%M%S"))
	OUTPUT_DIR = os.path.abspath(OUTPUT_DIR)
	mkdir_exists(OUTPUT_DIR)

	#开始读binlog匹配了
	for FILENAME_SOURCE in filelist:
		FILENAME_DEST = os.path.join(OUTPUT_DIR, os.path.basename(FILENAME_SOURCE))
		print(str(FILENAME_SOURCE)+" --> "+FILENAME_DEST)
		fs = open(FILENAME_SOURCE,'rb')
		fd = open(FILENAME_DEST,'wb')
		if fs.read(4) != b'\xfebin':
			f.seek(0,0) #relay log 就不需要跳过4字节开头了.
		fd.write(b'\xfebin')

		bdata_header = fs.read(19)
		timestamp, event_type, server_id, event_size, log_pos, flags = struct.unpack("<LBLLLh",bdata_header)
		bdata_payload = fs.read(event_size - 19)
		fd.write(bdata_header)
		fd.write(bdata_payload)
		

		CURRENT_TABLE = ""
		CURRENT_DATABASE = ""
		EVENT_LIST = [] #保存解析的BINLOG, 如果 >= 4 就表示有匹配成功的表了. 就写入文件
		while True:
			#read event_header
			bdata_header = fs.read(19)
			if bdata_header == b'' or len(bdata_header) < 19:
				break
			timestamp, event_type, server_id, event_size, log_pos, flags = struct.unpack("<LBLLLh",bdata_header)
			if event_type == 19: #table_map event
				CURRENT_TABLE_MAP_EVENT = []
				bdata_payload = fs.read(event_size - 19)
				CURRENT_DATABASE,CURRENT_TABLE = read_tablename_dbname(bdata_payload)
				if match_table(CURRENT_DATABASE, CURRENT_TABLE):
					EVENT_LIST.append((bdata_header,bdata_payload))
				else:
					fs.seek(log_pos,0)

			elif event_type == 16: #XID_EVENT
				bdata_payload = fs.read(event_size - 19)
				EVENT_LIST.append((bdata_header,bdata_payload))
				if len(EVENT_LIST) >= 4: #event数量足, 说明有匹配上的表
					for event in EVENT_LIST:
						write_event(event[0],event[1],fd)
				EVENT_LIST = [] #置空

			elif event_type == 33: #GTID_EVENT
				bdata_payload = fs.read(event_size - 19)
				EVENT_LIST.append((bdata_header,bdata_payload))

			elif event_type == 30 or event_type == 31 or event_type == 32: #ROW_EVENT
				if match_table(CURRENT_DATABASE, CURRENT_TABLE):
					bdata_payload = fs.read(event_size - 19)
					EVENT_LIST.append((bdata_header,bdata_payload))
				else:
					fs.seek(log_pos,0)

			elif event_type == 35: #给PREVIOUS_GTIDS_LOG_EVENT放个VIP通道 -_-
				bdata_payload = fs.read(event_size - 19)
				write_event(bdata_header,bdata_payload,fd)
			else:
				fs.seek(log_pos,0)

		fs.close()
		fd.close()
	print(OUTPUT_DIR)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 导读
  • 分析
  • binlog文件提取指定的表
  • 脚本编写思路
  • 测试
    • 从mysqldump中拆分出指定的表
      • 从Binlog提取指定的表
        • 验证
        • 总结
        • 附脚本
        相关产品与服务
        云数据库 MySQL
        腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档