前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MYSQL INNODB ibd文件详解 (2) 提取DDL和DML

MYSQL INNODB ibd文件详解 (2) 提取DDL和DML

原创
作者头像
大大刺猬
发布2023-04-24 21:55:29
9710
发布2023-04-24 21:55:29
举报
文章被收录于专栏:大大刺猬大大刺猬

上一章学习了一些管理页.... 这一张来看看数据(INDEX_PAGE)页

基础知识

mysql数据和索引是放一起的, 主键索引记录主键值和剩余字段值, 二级索引(普通索引)记录 索引值和主键值.

FIL_PAGE_INDEX

FIL_PAGE_INDEX 结构如下

名字

大小

描述

FIL_HEADER

38

PAGE_HEADER

56

数据页信息, 比如有多少字段之类的

PAGE_DATA

用户记录(record)

PAGE_DIRECTORY

目录信息, 相当于用户记录的部分索引.

FIL_TRAILER

8

page_data里存具体的字段,含最大值和最小值

page_directory 每个slot2字节, 指向record的第一位(record按4-8个分一组, 组第一个成员记录组大小(owned))

注: page_directory记录的offset是不包含record及其之前的部分的.

看起来有点怪,但就是这样的....
看起来有点怪,但就是这样的....

FIL_PAGE_SDI

从上面知道, 要解析字段还需要字段可变字段数量才行, 也就是表字典信息, 8.0的字段信息在ibd文件里面就有, 就是FIL_PAGE_SDI. 所以我们只要解析SDI_PAGE就行.

REC_N_FIELDS_ONE_BYTE_MAX = 0x7F #超过(>)这个值, 就使用2字节 (就是第 1 bit位 标记是否使用2字节)

我并没有找到SDI_PAGE的结构信息, 不过不要紧, 官网写了SDI使用了压缩存储. 那我们就直接暴力解析获取字典信息吧....

代码语言:javascript
复制
def sdi_page_data(bdata):
        _sdi_offset = []
        isok = False
        for i in range(120,16384):
                if isok:
                        break
                for j in range(i,16384):
                        if isok:
                                break
                        try:
                                _test = zlib.decompress(bdata[i:j])
                                _sdi_offset.append(i)
                                i += len(_test)
                                if len(_sdi_offset) == 2:
                                        isok = True #break 2
                                break
                        except:
                                pass
        sdi_info = [json.loads(zlib.decompress(bdata[_sdi_offset[0]:]).decode()), json.loads(zlib.decompress(bdata[_sdi_offset[1]:]).decode())]
        return sdi_info

能获取字段信息了, 那就可以拼接处DDL了. 方向我给了代码的.文末(def get_ddl(data))

PYTHON解析出DDL和DML

不多数了, 直接看效果吧.

为了简单, 我只解析的cluster_index的叶子节点. 非叶子节点并没有解析(我又不查询数据...). 二级索引也没有解析(因为cluster index才记录了完整数据的...)

数据类型, 目前只支持 int 和 varchar, 其它类型需要读者自己去解析(比如date有3字节,年月日对应9:4:5 bit)

代码语言:javascript
复制
import innodb_index
filename = '/data/mysql_3314/mysqldata/db1/t20230424_666.ibd'
page_size = 16384
aa = innodb_index.rec_data_cluster(filename) #我默认就print DDL 了
print(len(aa))

for x in aa[:10]:
    print(x)
图中数据均为随机生成
图中数据均为随机生成

然后我们去数据库验证下吧

DDL对得上

数据量也对得上

忘记解析字符集了, 不过不要紧....
忘记解析字符集了, 不过不要紧....

前10行数据也对得上

数据是随机生成的哈
数据是随机生成的哈

来点猛的, 把原数据删掉, 用我们这个导入进去看下 (不要在生产环境试哈)

代码语言:javascript
复制
python>> len(aa)
python>> with open('/tmp/t20230424_666.sql','w') as f:
python>>     for x in aa:
python>>         _size = f.write(x)

shell>> mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e 'checksum table t20230424_666;'
shell>> mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e 'drop table t20230424_666;'
shell>> mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e "
CREATE Table db1.t20230424_666( 
id int not null,
name varchar(50),
addr varchar(100),
email varchar(100) 
,PRIMARY KEY(id)) engine=InnoDB comment='';"
shell>> mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 < /tmp/t20230424_666.sql
shell>> mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e 'checksum table t20230424_666;'
为了方便,就使用的checksum
为了方便,就使用的checksum

总结

1. recorde按照4-8个为一组, 每组的第一个记录该组的大小, 并在slot记录自己(本组第一个)的位置(都是为的快速查询...)

2. mysql默认的int类型第一bit是记录正负的, 解析的时候要注意...

3. 解析sdi也可以使用官方的工具 sdi2ibd

4. 本文给的工具只支持部分数据类型. 解析的时候也没有用并发, 如果数据量大, 可以使用并发

附python源码

innodb_page_type.py 在上一篇文章

innodb_file.py 也在上一篇文章(注,均为大端, 有些地方可能写成小端了)

innodb_index.py 如下

代码语言:javascript
复制
#解析 FIL_PAGE_INDEX
import struct
import innodb_page_type
import innodb_file

import zlib,json

PAGE_SIZE = 16384

FIL_PAGE_DATA = 38
FIL_PAGE_DATA_END = 8

PAGE_NEW_INFIMUM = 99
PAGE_NEW_SUPREMUM = 112


PAGE_DIR_SLOT_MAX_N_OWNED = 8
PAGE_DIR_SLOT_MIN_N_OWNED = 4

REC_N_FIELDS_ONE_BYTE_MAX = 0x7F #超过(>)这个值, 就使用2字节 (就是第 1 bit位 标记是否使用2字节)

innodb_page_name = {}
for x in dir(innodb_page_type):
	if x[:2] != '__':
		innodb_page_name[getattr(innodb_page_type,x)] = x

def fseg_header(bdata):
	return struct.unpack('>LLH',bdata[:10])

def read_var_len(bdata,start):
	pass


#没找到sdi的结构, 但是发现它是压缩的, 所以我们使用暴力解压获取sdi数据吧....
#However, SDI data is compressed to reduce the storage footprint  https://dev.mysql.com/doc/refman/8.0/en/serialized-dictionary-information.html
#TODO sdi2ibd
def sdi_page_data(bdata):
	_sdi_offset = []
	isok = False
	for i in range(120,16384):
		if isok:
			break
		for j in range(i,16384):
			if isok:
				break
			try:
				_test = zlib.decompress(bdata[i:j])
				_sdi_offset.append(i)
				i += len(_test)
				if len(_sdi_offset) == 2:
					isok = True #break 2
				break
			except:
				pass
	sdi_info = [json.loads(zlib.decompress(bdata[_sdi_offset[0]:]).decode()), json.loads(zlib.decompress(bdata[_sdi_offset[1]:]).decode())]
	return sdi_info

#根据sdi信息返回表DDL语句
def get_ddl(data):
	ddl = f"CREATE {data[1]['dd_object_type']} {data[1]['dd_object']['schema_ref']}.{data[1]['dd_object']['name']}( \n"
	pk_list = []
	auto_key = None
	col_len = len(data[1]['dd_object']['columns']) - 2
	_col_len = 0
	coll = []
	for col in data[1]['dd_object']['columns']:
		if col['name'] in ['DB_TRX_ID','DB_ROLL_PTR']:
			continue
		if col['is_nullable']:
			ddl += f"{col['name']} {col['column_type_utf8']}"
		else:
			ddl += f"{col['name']} {col['column_type_utf8']} not null"
		_col_len += 1
		coll.append(col['name'])
		if col['is_auto_increment']:
			ddl += ' auto_increment,\n'
		elif _col_len < col_len:
			ddl += ',\n'
		else:
			ddl += ' \n'

	index_ddl = ''
	for i in data[1]['dd_object']['indexes']:
		name = "PRIMARY KEY" if i['name'] == 'PRIMARY' else  f"index {i['name']}"
		index_ddl += f",{name}("
		for x in i['elements']:
			if x['length'] < 4294967295:
				index_ddl += f"{coll[x['column_opx']]},"
		index_ddl = index_ddl[:-1] #去掉最后一个,
		index_ddl += ")"
	
	ddl += index_ddl
	ddl += f") engine={data[1]['dd_object']['engine']} comment='{data[1]['dd_object']['comment']}';"	
	return ddl

def get_rec_data(bdata,columns,index):
	rdata = []
	page0 = page_index(bdata)
	index_ = [] 
	for x in index['elements']:
		if x['length'] < 4294967295:
			index_.append(x['column_opx'])
	len_column = len(columns) - 2 #去掉DB_TRX_ID(6)  DB_ROLL_PTR(7)
	for x in page0.records:
		rec_header = rec_extra_header(bdata[x-5:x])
		if rec_header.deleted:
			#print('deleted')
			continue #删除的数据就不要了
		tdata = [ None for x in range(len_column) ]
		null_bitmask_size = int((len_column+7)/8)
		null_bitmask = int.from_bytes(bdata[x-null_bitmask_size-5:x-5],'big')
		toffset = x-5-null_bitmask_size
		doffset = x
		#print(x,toffset,doffset,null_bitmask)
		#read_key:
		for k in index_: #遍历索引字段
			ktype = columns[k]['type']
			if ktype in [16,]:#变长...
				ksize = bdata[toffset-1:toffset] #懒得考虑超过128的情况了...
				if ksize > REC_N_FIELDS_ONE_BYTE_MAX:
					ksize = bdata[toffset-2:toffset]
					toffset -= 1
				toffset -= 1
				#print(toffset)
				tdata[k] = bdata[doffset:doffset+ksize].decode()
				doffset += ksize
			elif ktype in [15,]: #date
				tdata[k] = bdata[doffset:doffset+3]
				doffset += 3

			elif ktype in [4,]: #DATA_BINARY 4字节 第一bit是记录正负
				_t = struct.unpack('>L',bdata[doffset:doffset+4])[0]
				tdata[k] = (_t&((1<<31)-1)) if _t&(1<<31) else -(_t&((1<<31)-1))
				#tdata[k] = bdata[doffset:doffset+4]
				doffset += 4

		doffset += 6 + 7
		#遍历其它数据,
		for k in range(len_column):
			if k in index_: #索引已经记录了数据了
				continue
			if null_bitmask&(1<<k): #空字段
				continue
			ktype = columns[k]['type']
			#print(columns[k]['name'],toffset)
			if ktype in [16,]:#变长...
				ksize = struct.unpack('>B',bdata[toffset-1:toffset])[0] #懒得考虑超过128的情况了...
				if ksize > REC_N_FIELDS_ONE_BYTE_MAX:
					ksize = struct.unpack('>H',bdata[toffset-2:toffset])[0]
					toffset -= 1
				toffset -= 1
				tdata[k] = bdata[doffset:doffset+ksize].decode()
				doffset += ksize
			elif ktype in [15,]: #date
				tdata[k] = bdata[doffset:doffset+3]
				doffset += 3

			elif ktype in [4,]: #DATA_BINARY 4字节
				_t = struct.unpack('>L',bdata[doffset:doffset+4])[0]
				tdata[k] = (_t&((1<<31)-1)) if _t&(1<<31) else -(_t&((1<<31)-1))
				#tdata[k] = bdata[doffset:doffset+4]
				doffset += 4
		rdata.append(tdata)
		#break
		
	return rdata
			

#数据类型 storage/innobase/include/data0type.h
def rec_data_cluster(filename): #主键索引必须显示主键, 
	"""
	filename: 文件名
	"""
	f = open(filename,'rb')
	f.seek(PAGE_SIZE*3,0)
	sdi_info = sdi_page_data(f.read(PAGE_SIZE))
	ddl = get_ddl(sdi_info)
	print(ddl)
	db_table = f'{sdi_info[1]["dd_object"]["schema_ref"]}.{sdi_info[1]["dd_object"]["name"]}'
	index = sdi_info[1]['dd_object']['indexes'][0]
	columns = sdi_info[1]['dd_object']['columns']
	root_page = int(index['se_private_data'].split('root=')[1].split(';')[0])
	#f.seek(PAGE_SIZE*root_page,0)
	#只需要找到第一个叶子节点, 然后解析叶子节点的数据即可. (inode不是就记录了第一个叶子节点么.....)
	f.seek(2*PAGE_SIZE,0)
	inode = innodb_file.inode(f.read(PAGE_SIZE)[38:PAGE_SIZE-8])
	leaf_page = 0
	for x in inode.index:
		if x['no_leaf'] == root_page:
			leaf_page = x['leaf']
			break
	
	rdata = [] #[(col1,col2),(col1,col2)] 
	next_page_number = leaf_page
	while True:
		if next_page_number == 4294967295:
			break
		#print('PAGE_NUM:',next_page_number)
		f.seek(next_page_number*PAGE_SIZE,0)
		page0 = page(f.read(PAGE_SIZE))
		next_page_number = page0.FIL_PAGE_NEXT
		rdata += get_rec_data(page0.bdata,columns,index)
		#break

	rdata_sql = []
	for x in rdata:
		_v = ''
		for j in x:
			_v += f'{j},' if isinstance(j,int) else f'"{j}",'
		_v = _v[:-1]
		_sql = f'insert into {db_table} values({_v});'
		rdata_sql.append(_sql)
	return rdata_sql


#storage/innobase/rem/rec.h
REC_INFO_MIN_REC_FLAG = 0x10
REC_INFO_DELETED_FLAG = 0x20
REC_N_OWNED_MASK = 0xF
REC_HEAP_NO_MASK = 0xFFF8
REC_NEXT_MASK = 0xFFFF
#REC_STATUS_ORDINARY 0
#REC_STATUS_NODE_PTR 1
#REC_STATUS_INFIMUM 2
#REC_STATUS_SUPREMUM 3
class rec_extra_header(object):
	def __init__(self,bdata):
		if len(bdata) != 5:
			return False
		fb = struct.unpack('>B',bdata[:1])[0]
		self.deleted = True if fb&REC_INFO_DELETED_FLAG else False  #是否被删除
		self.min_rec = True if fb&REC_INFO_MIN_REC_FLAG else False #if and only if the record is the first user record on a non-leaf
		self.owned = fb&REC_N_OWNED_MASK # 大于0表示这个rec是这组的第一个, 就是地址被记录在page_directory里面
		self.heap_no = struct.unpack('>H',bdata[1:3])[0]&REC_HEAP_NO_MASK #heap number, 0 min, 1 max other:rec
		self.record_type = struct.unpack('>H',bdata[1:3])[0]&((1<<3)-1) #0:rec 1:no-leaf 2:min 3:max
		self.next_record = struct.unpack('>H',bdata[3:5])[0]
	def __str__(self):
		return f'deleted:{self.deleted}  min_rec:{self.min_rec}  owned:{self.owned}  heap_no:{self.heap_no}  record_type:{self.record_type}  next_record:{self.next_record}'

class page(object):
	def __init__(self,bdata):
		if len(bdata) != PAGE_SIZE:
			return None

		self.FIL_PAGE_SPACE_OR_CHKSUM, self.FIL_PAGE_OFFSET, self.FIL_PAGE_PREV, self.FIL_PAGE_NEXT, self.FIL_PAGE_LSN, self.FIL_PAGE_TYPE, self.FIL_PAGE_FILE_FLUSH_LSN = struct.unpack('>4LQHQ',bdata[:34])
		self.FIL_PAGE_SPACE_ID = struct.unpack('>L',bdata[34:38])[0]
		
		self.CHECKSUM, self.FIL_PAGE_LSN = struct.unpack('>2L',bdata[-8:])
		self.bdata = bdata

	def fil_header(self):
		return f'PAGE_SPACE_ID:{self.FIL_PAGE_SPACE_ID}  PAGE_TYPE:{innodb_page_name[self.FIL_PAGE_TYPE]} PREV:{self.FIL_PAGE_PREV}  NEXT:{self.FIL_PAGE_NEXT}'


	def fil_trailer(self):
		return f'CHECKSUM:{self.CHECKSUM}  PAGE_LSN:{self.FIL_PAGE_LSN}'


class page_index(page):
	def __init__(self,bdata):
		super().__init__(bdata)


		self.cols = [] #字段类型列表.

		#PAGE_HEADER
		bdata = self.bdata[FIL_PAGE_DATA:PAGE_SIZE-FIL_PAGE_DATA_END]
		self.PAGE_N_DIR_SLOTS, self.PAGE_HEAP_TOP, self.PAGE_N_HEAP, self.PAGE_FREE, self.PAGE_GARBAGE, self.PAGE_LAST_INSERT, self.PAGE_DIRECTION, self.PAGE_N_DIRECTION, self.PAGE_N_RECS, self.PAGE_MAX_TRX_ID, self.PAGE_LEVEL, self.PAGE_INDEX_ID = struct.unpack('>9HQHQ',bdata[:36])
		self.PAGE_BTR_SEG_LEAF = fseg_header(bdata[36:46])
		self.PAGE_BTR_SEG_TOP = fseg_header(bdata[46:56])


		#PAGE_DIRECTORY (这两字节指向的数据位置, 不包含数据前面的 5-byte header  就是REC_N_NEW_EXTRA_BYTES)
		page_directorys = []
		for x in range(int(PAGE_SIZE/2)):
			tdata = struct.unpack('>H',self.bdata[-(2+FIL_PAGE_DATA_END+x*2):-(FIL_PAGE_DATA_END+x*2)])[0]
			page_directorys.append(tdata)
			if tdata == PAGE_NEW_SUPREMUM: 
				break #slot遍历完成
		self.page_directorys = page_directorys



		#RECORDS(PAGE_DATA)  innodb_default_row_format
		offset = self.page_directorys[:1][0] #第一字段, 虚拟的...
		records = []
		while True:
			record_type = self.bdata[offset-3:offset-2]
			if record_type == b'\x03' or record_type == b'': #00 普通rec(leaf),  01 no_leaf   02 min_rec  03 max_rec
				break
			records.append(offset)
			offset += struct.unpack('>H',self.bdata[offset-2:offset])[0]
		records.remove(PAGE_NEW_INFIMUM) #去掉第一个页(虚拟的页,你把握不住)
		self.records = records


	def get_record(self,rec_offset):
		"""
		根据用户给的偏移量返回对于的数据
		"""
		pass

	def find_data_with_index(index_value):
		"""
		根据用户给的index值查找数据 找page, 然后通过二分法找rec(利用slot)
		"""
		pass


	def record(self):
		return f'RECORDS:{len(self.records)}'

	def page_header(self):
		return f'SLOTS:{self.PAGE_N_DIR_SLOTS}  PAGE_LEVEL:{self.PAGE_LEVEL}  INDEX_ID:{self.PAGE_INDEX_ID}  RECORDS:{self.PAGE_N_RECS}  PAGE_HEAP_TOP:{self.PAGE_HEAP_TOP}  PAGE_GARBAGE(deleted):{self.PAGE_GARBAGE}  PAGE_FREE:{self.PAGE_FREE}'


	def page_directory(self):
		return f'SLOTS:{len(self.page_directorys)}   MAX:{self.page_directorys[-1:]}  MIN:{self.page_directorys[:1]}'

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基础知识
  • FIL_PAGE_INDEX
  • FIL_PAGE_SDI
  • PYTHON解析出DDL和DML
  • 总结
  • 附python源码
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档