上一章学习了一些管理页.... 这一张来看看数据(INDEX_PAGE)页
mysql数据和索引是放一起的, 主键索引记录主键值和剩余字段值, 二级索引(普通索引)记录 索引值和主键值.
FIL_PAGE_INDEX 结构如下
名字 | 大小 | 描述 |
---|---|---|
FIL_HEADER | 38 | |
PAGE_HEADER | 56 | 数据页信息, 比如有多少字段之类的 |
PAGE_DATA | 用户记录(record) | |
PAGE_DIRECTORY | 目录信息, 相当于用户记录的部分索引. | |
FIL_TRAILER | 8 | |
page_data里存具体的字段,含最大值和最小值
page_directory 每个slot2字节, 指向record的第一位(record按4-8个分一组, 组第一个成员记录组大小(owned))
注: page_directory记录的offset是不包含record及其之前的部分的.
从上面知道, 要解析字段还需要字段可变字段数量才行, 也就是表字典信息, 8.0的字段信息在ibd文件里面就有, 就是FIL_PAGE_SDI. 所以我们只要解析SDI_PAGE就行.
REC_N_FIELDS_ONE_BYTE_MAX = 0x7F #超过(>)这个值, 就使用2字节 (就是第 1 bit位 标记是否使用2字节)
我并没有找到SDI_PAGE的结构信息, 不过不要紧, 官网写了SDI使用了压缩存储. 那我们就直接暴力解析获取字典信息吧....
def sdi_page_data(bdata):
_sdi_offset = []
isok = False
for i in range(120,16384):
if isok:
break
for j in range(i,16384):
if isok:
break
try:
_test = zlib.decompress(bdata[i:j])
_sdi_offset.append(i)
i += len(_test)
if len(_sdi_offset) == 2:
isok = True #break 2
break
except:
pass
sdi_info = [json.loads(zlib.decompress(bdata[_sdi_offset[0]:]).decode()), json.loads(zlib.decompress(bdata[_sdi_offset[1]:]).decode())]
return sdi_info
能获取字段信息了, 那就可以拼接处DDL了. 方向我给了代码的.文末(def get_ddl(data))
不多数了, 直接看效果吧.
为了简单, 我只解析的cluster_index的叶子节点. 非叶子节点并没有解析(我又不查询数据...). 二级索引也没有解析(因为cluster index才记录了完整数据的...)
数据类型, 目前只支持 int 和 varchar, 其它类型需要读者自己去解析(比如date有3字节,年月日对应9:4:5 bit)
import innodb_index
filename = '/data/mysql_3314/mysqldata/db1/t20230424_666.ibd'
page_size = 16384
aa = innodb_index.rec_data_cluster(filename) #我默认就print DDL 了
print(len(aa))
for x in aa[:10]:
print(x)
然后我们去数据库验证下吧
DDL对得上
数据量也对得上
前10行数据也对得上
来点猛的, 把原数据删掉, 用我们这个导入进去看下 (不要在生产环境试哈)
python>> len(aa)
python>> with open('/tmp/t20230424_666.sql','w') as f:
python>> for x in aa:
python>> _size = f.write(x)
shell>> mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e 'checksum table t20230424_666;'
shell>> mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e 'drop table t20230424_666;'
shell>> mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e "
CREATE Table db1.t20230424_666(
id int not null,
name varchar(50),
addr varchar(100),
email varchar(100)
,PRIMARY KEY(id)) engine=InnoDB comment='';"
shell>> mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 < /tmp/t20230424_666.sql
shell>> mysql -h192.168.101.21 -P3314 -p123456 -Ddb1 -e 'checksum table t20230424_666;'
1. recorde按照4-8个为一组, 每组的第一个记录该组的大小, 并在slot记录自己(本组第一个)的位置(都是为的快速查询...)
2. mysql默认的int类型第一bit是记录正负的, 解析的时候要注意...
3. 解析sdi也可以使用官方的工具 sdi2ibd
4. 本文给的工具只支持部分数据类型. 解析的时候也没有用并发, 如果数据量大, 可以使用并发
innodb_page_type.py 在上一篇文章
innodb_file.py 也在上一篇文章(注,均为大端, 有些地方可能写成小端了)
innodb_index.py 如下
#解析 FIL_PAGE_INDEX
import struct
import innodb_page_type
import innodb_file
import zlib,json
PAGE_SIZE = 16384
FIL_PAGE_DATA = 38
FIL_PAGE_DATA_END = 8
PAGE_NEW_INFIMUM = 99
PAGE_NEW_SUPREMUM = 112
PAGE_DIR_SLOT_MAX_N_OWNED = 8
PAGE_DIR_SLOT_MIN_N_OWNED = 4
REC_N_FIELDS_ONE_BYTE_MAX = 0x7F #超过(>)这个值, 就使用2字节 (就是第 1 bit位 标记是否使用2字节)
innodb_page_name = {}
for x in dir(innodb_page_type):
if x[:2] != '__':
innodb_page_name[getattr(innodb_page_type,x)] = x
def fseg_header(bdata):
return struct.unpack('>LLH',bdata[:10])
def read_var_len(bdata,start):
pass
#没找到sdi的结构, 但是发现它是压缩的, 所以我们使用暴力解压获取sdi数据吧....
#However, SDI data is compressed to reduce the storage footprint https://dev.mysql.com/doc/refman/8.0/en/serialized-dictionary-information.html
#TODO sdi2ibd
def sdi_page_data(bdata):
_sdi_offset = []
isok = False
for i in range(120,16384):
if isok:
break
for j in range(i,16384):
if isok:
break
try:
_test = zlib.decompress(bdata[i:j])
_sdi_offset.append(i)
i += len(_test)
if len(_sdi_offset) == 2:
isok = True #break 2
break
except:
pass
sdi_info = [json.loads(zlib.decompress(bdata[_sdi_offset[0]:]).decode()), json.loads(zlib.decompress(bdata[_sdi_offset[1]:]).decode())]
return sdi_info
#根据sdi信息返回表DDL语句
def get_ddl(data):
ddl = f"CREATE {data[1]['dd_object_type']} {data[1]['dd_object']['schema_ref']}.{data[1]['dd_object']['name']}( \n"
pk_list = []
auto_key = None
col_len = len(data[1]['dd_object']['columns']) - 2
_col_len = 0
coll = []
for col in data[1]['dd_object']['columns']:
if col['name'] in ['DB_TRX_ID','DB_ROLL_PTR']:
continue
if col['is_nullable']:
ddl += f"{col['name']} {col['column_type_utf8']}"
else:
ddl += f"{col['name']} {col['column_type_utf8']} not null"
_col_len += 1
coll.append(col['name'])
if col['is_auto_increment']:
ddl += ' auto_increment,\n'
elif _col_len < col_len:
ddl += ',\n'
else:
ddl += ' \n'
index_ddl = ''
for i in data[1]['dd_object']['indexes']:
name = "PRIMARY KEY" if i['name'] == 'PRIMARY' else f"index {i['name']}"
index_ddl += f",{name}("
for x in i['elements']:
if x['length'] < 4294967295:
index_ddl += f"{coll[x['column_opx']]},"
index_ddl = index_ddl[:-1] #去掉最后一个,
index_ddl += ")"
ddl += index_ddl
ddl += f") engine={data[1]['dd_object']['engine']} comment='{data[1]['dd_object']['comment']}';"
return ddl
def get_rec_data(bdata,columns,index):
rdata = []
page0 = page_index(bdata)
index_ = []
for x in index['elements']:
if x['length'] < 4294967295:
index_.append(x['column_opx'])
len_column = len(columns) - 2 #去掉DB_TRX_ID(6) DB_ROLL_PTR(7)
for x in page0.records:
rec_header = rec_extra_header(bdata[x-5:x])
if rec_header.deleted:
#print('deleted')
continue #删除的数据就不要了
tdata = [ None for x in range(len_column) ]
null_bitmask_size = int((len_column+7)/8)
null_bitmask = int.from_bytes(bdata[x-null_bitmask_size-5:x-5],'big')
toffset = x-5-null_bitmask_size
doffset = x
#print(x,toffset,doffset,null_bitmask)
#read_key:
for k in index_: #遍历索引字段
ktype = columns[k]['type']
if ktype in [16,]:#变长...
ksize = bdata[toffset-1:toffset] #懒得考虑超过128的情况了...
if ksize > REC_N_FIELDS_ONE_BYTE_MAX:
ksize = bdata[toffset-2:toffset]
toffset -= 1
toffset -= 1
#print(toffset)
tdata[k] = bdata[doffset:doffset+ksize].decode()
doffset += ksize
elif ktype in [15,]: #date
tdata[k] = bdata[doffset:doffset+3]
doffset += 3
elif ktype in [4,]: #DATA_BINARY 4字节 第一bit是记录正负
_t = struct.unpack('>L',bdata[doffset:doffset+4])[0]
tdata[k] = (_t&((1<<31)-1)) if _t&(1<<31) else -(_t&((1<<31)-1))
#tdata[k] = bdata[doffset:doffset+4]
doffset += 4
doffset += 6 + 7
#遍历其它数据,
for k in range(len_column):
if k in index_: #索引已经记录了数据了
continue
if null_bitmask&(1<<k): #空字段
continue
ktype = columns[k]['type']
#print(columns[k]['name'],toffset)
if ktype in [16,]:#变长...
ksize = struct.unpack('>B',bdata[toffset-1:toffset])[0] #懒得考虑超过128的情况了...
if ksize > REC_N_FIELDS_ONE_BYTE_MAX:
ksize = struct.unpack('>H',bdata[toffset-2:toffset])[0]
toffset -= 1
toffset -= 1
tdata[k] = bdata[doffset:doffset+ksize].decode()
doffset += ksize
elif ktype in [15,]: #date
tdata[k] = bdata[doffset:doffset+3]
doffset += 3
elif ktype in [4,]: #DATA_BINARY 4字节
_t = struct.unpack('>L',bdata[doffset:doffset+4])[0]
tdata[k] = (_t&((1<<31)-1)) if _t&(1<<31) else -(_t&((1<<31)-1))
#tdata[k] = bdata[doffset:doffset+4]
doffset += 4
rdata.append(tdata)
#break
return rdata
#数据类型 storage/innobase/include/data0type.h
def rec_data_cluster(filename): #主键索引必须显示主键,
"""
filename: 文件名
"""
f = open(filename,'rb')
f.seek(PAGE_SIZE*3,0)
sdi_info = sdi_page_data(f.read(PAGE_SIZE))
ddl = get_ddl(sdi_info)
print(ddl)
db_table = f'{sdi_info[1]["dd_object"]["schema_ref"]}.{sdi_info[1]["dd_object"]["name"]}'
index = sdi_info[1]['dd_object']['indexes'][0]
columns = sdi_info[1]['dd_object']['columns']
root_page = int(index['se_private_data'].split('root=')[1].split(';')[0])
#f.seek(PAGE_SIZE*root_page,0)
#只需要找到第一个叶子节点, 然后解析叶子节点的数据即可. (inode不是就记录了第一个叶子节点么.....)
f.seek(2*PAGE_SIZE,0)
inode = innodb_file.inode(f.read(PAGE_SIZE)[38:PAGE_SIZE-8])
leaf_page = 0
for x in inode.index:
if x['no_leaf'] == root_page:
leaf_page = x['leaf']
break
rdata = [] #[(col1,col2),(col1,col2)]
next_page_number = leaf_page
while True:
if next_page_number == 4294967295:
break
#print('PAGE_NUM:',next_page_number)
f.seek(next_page_number*PAGE_SIZE,0)
page0 = page(f.read(PAGE_SIZE))
next_page_number = page0.FIL_PAGE_NEXT
rdata += get_rec_data(page0.bdata,columns,index)
#break
rdata_sql = []
for x in rdata:
_v = ''
for j in x:
_v += f'{j},' if isinstance(j,int) else f'"{j}",'
_v = _v[:-1]
_sql = f'insert into {db_table} values({_v});'
rdata_sql.append(_sql)
return rdata_sql
#storage/innobase/rem/rec.h
REC_INFO_MIN_REC_FLAG = 0x10
REC_INFO_DELETED_FLAG = 0x20
REC_N_OWNED_MASK = 0xF
REC_HEAP_NO_MASK = 0xFFF8
REC_NEXT_MASK = 0xFFFF
#REC_STATUS_ORDINARY 0
#REC_STATUS_NODE_PTR 1
#REC_STATUS_INFIMUM 2
#REC_STATUS_SUPREMUM 3
class rec_extra_header(object):
def __init__(self,bdata):
if len(bdata) != 5:
return False
fb = struct.unpack('>B',bdata[:1])[0]
self.deleted = True if fb&REC_INFO_DELETED_FLAG else False #是否被删除
self.min_rec = True if fb&REC_INFO_MIN_REC_FLAG else False #if and only if the record is the first user record on a non-leaf
self.owned = fb&REC_N_OWNED_MASK # 大于0表示这个rec是这组的第一个, 就是地址被记录在page_directory里面
self.heap_no = struct.unpack('>H',bdata[1:3])[0]&REC_HEAP_NO_MASK #heap number, 0 min, 1 max other:rec
self.record_type = struct.unpack('>H',bdata[1:3])[0]&((1<<3)-1) #0:rec 1:no-leaf 2:min 3:max
self.next_record = struct.unpack('>H',bdata[3:5])[0]
def __str__(self):
return f'deleted:{self.deleted} min_rec:{self.min_rec} owned:{self.owned} heap_no:{self.heap_no} record_type:{self.record_type} next_record:{self.next_record}'
class page(object):
def __init__(self,bdata):
if len(bdata) != PAGE_SIZE:
return None
self.FIL_PAGE_SPACE_OR_CHKSUM, self.FIL_PAGE_OFFSET, self.FIL_PAGE_PREV, self.FIL_PAGE_NEXT, self.FIL_PAGE_LSN, self.FIL_PAGE_TYPE, self.FIL_PAGE_FILE_FLUSH_LSN = struct.unpack('>4LQHQ',bdata[:34])
self.FIL_PAGE_SPACE_ID = struct.unpack('>L',bdata[34:38])[0]
self.CHECKSUM, self.FIL_PAGE_LSN = struct.unpack('>2L',bdata[-8:])
self.bdata = bdata
def fil_header(self):
return f'PAGE_SPACE_ID:{self.FIL_PAGE_SPACE_ID} PAGE_TYPE:{innodb_page_name[self.FIL_PAGE_TYPE]} PREV:{self.FIL_PAGE_PREV} NEXT:{self.FIL_PAGE_NEXT}'
def fil_trailer(self):
return f'CHECKSUM:{self.CHECKSUM} PAGE_LSN:{self.FIL_PAGE_LSN}'
class page_index(page):
def __init__(self,bdata):
super().__init__(bdata)
self.cols = [] #字段类型列表.
#PAGE_HEADER
bdata = self.bdata[FIL_PAGE_DATA:PAGE_SIZE-FIL_PAGE_DATA_END]
self.PAGE_N_DIR_SLOTS, self.PAGE_HEAP_TOP, self.PAGE_N_HEAP, self.PAGE_FREE, self.PAGE_GARBAGE, self.PAGE_LAST_INSERT, self.PAGE_DIRECTION, self.PAGE_N_DIRECTION, self.PAGE_N_RECS, self.PAGE_MAX_TRX_ID, self.PAGE_LEVEL, self.PAGE_INDEX_ID = struct.unpack('>9HQHQ',bdata[:36])
self.PAGE_BTR_SEG_LEAF = fseg_header(bdata[36:46])
self.PAGE_BTR_SEG_TOP = fseg_header(bdata[46:56])
#PAGE_DIRECTORY (这两字节指向的数据位置, 不包含数据前面的 5-byte header 就是REC_N_NEW_EXTRA_BYTES)
page_directorys = []
for x in range(int(PAGE_SIZE/2)):
tdata = struct.unpack('>H',self.bdata[-(2+FIL_PAGE_DATA_END+x*2):-(FIL_PAGE_DATA_END+x*2)])[0]
page_directorys.append(tdata)
if tdata == PAGE_NEW_SUPREMUM:
break #slot遍历完成
self.page_directorys = page_directorys
#RECORDS(PAGE_DATA) innodb_default_row_format
offset = self.page_directorys[:1][0] #第一字段, 虚拟的...
records = []
while True:
record_type = self.bdata[offset-3:offset-2]
if record_type == b'\x03' or record_type == b'': #00 普通rec(leaf), 01 no_leaf 02 min_rec 03 max_rec
break
records.append(offset)
offset += struct.unpack('>H',self.bdata[offset-2:offset])[0]
records.remove(PAGE_NEW_INFIMUM) #去掉第一个页(虚拟的页,你把握不住)
self.records = records
def get_record(self,rec_offset):
"""
根据用户给的偏移量返回对于的数据
"""
pass
def find_data_with_index(index_value):
"""
根据用户给的index值查找数据 找page, 然后通过二分法找rec(利用slot)
"""
pass
def record(self):
return f'RECORDS:{len(self.records)}'
def page_header(self):
return f'SLOTS:{self.PAGE_N_DIR_SLOTS} PAGE_LEVEL:{self.PAGE_LEVEL} INDEX_ID:{self.PAGE_INDEX_ID} RECORDS:{self.PAGE_N_RECS} PAGE_HEAP_TOP:{self.PAGE_HEAP_TOP} PAGE_GARBAGE(deleted):{self.PAGE_GARBAGE} PAGE_FREE:{self.PAGE_FREE}'
def page_directory(self):
return f'SLOTS:{len(self.page_directorys)} MAX:{self.page_directorys[-1:]} MIN:{self.page_directorys[:1]}'
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有