对目录打包(归档),我们一般使用tar之类的命令来做, 原生的tar命令是单线程的, 也不支持加密操作. 当然可以借助第三方工具来实现, 但都使用第三方工具了, 为何不自己写一个呢.
归档: 将目标文件(不常用的)放入某个文件(集中起来保存)
压缩: 提取关键字,用更少的数据保存更多的数据, 就是节省空间. (varchar在某种程度上也算节省了空间). 本文不会讨论哈夫曼之类的算法的, 太复杂了.
加密: 原始的数据即使归档之后, 也是会被看出来内容的, 如果你不想被别人看到里面是啥, 就可以给数据加密...., 加密比较耗费cpu资源, 这取决于加密算法.
本文较长, 不感兴趣的可以直接跳到结尾的 压测结果比较
将目标文件和目录信息记录下来(HEADER), 本次测试只记录了文件名和目录名字和大小. 没有记录权限(懒...)
然后将目标文件拆分为指定大小的块 存储在 指定的文件里面(BODY).
为了解压的时候快一点, 可以将每个块对应的文件ID和offset也记录下来(FOOTER)
header格式如下
名字 | 大小(byte) | 描述 |
---|---|---|
header_size | 4 | header的大小 |
total_size | 8 | 源文件的总大小(所有文件加起来,不含目录) |
block_size | 4 | 块大小(body是按照block存储的) |
compr | 1 | 是否压缩 |
encryption | 1 | 是否加密 |
fast_extract | 8 | 是否有footer |
lbdirs | 4 | 目录信息长度 |
lbfiles | 4 | 文件信息长度 |
lbfilesize | 4 | 文件大小长度 |
bodysize | 8 | body大小(未使用) |
dirs | 取决于lbdirs | 目录信息,为list, index值作为file_id |
files | 取决于lbfilesize | 文件信息 |
filesize | 取决于bodysize | 文件大小信息 |
body由若干个block组成, 每个block是取源文件的block_size(默认256MB)大小, 受加密和压缩影响, 实际存储大小并不等于block_size
block格式如下
名字 | 大小 | 描述 |
---|---|---|
lbdata | 4 | 本块的数据长度(不含fileid和block_id) |
fileid | 4 | 本块属于哪个文件(files对应的索引值) |
block_id | 4 | 本块属于哪个文件的哪个块(offset = block_id*block_size) |
data | 取决于lbdata | 数据(压缩加密后的数据) |
注: 如果既有压缩也有加密的话, 是先压缩后加密(节省点CPU)
默认均使用压缩, 测试内容就是mysql.tar.gz
import time
import ddcw_tar
starttime = time.time()
aa = ddcw_tar.archce('/tmp/t20230419.ddcw.tar','/tmp/mysql-5.7.41-linux-x86_64')
aa.FORCE = True #文件存在就覆盖
aa.parallel = 4 #默认为4
aa.archive() #压缩
endtime = time.time()
print(f'COST TIME {round((endtime-starttime),1)} seconds')
再解压看下效果吧, 不然光压缩了, 解压不了,有屁用....
命令和压缩一样, 只是调用的是extract函数
import time
import ddcw_tar
starttime = time.time()
aa = ddcw_tar.archce('/tmp/t20230419.ddcw.tar','/tmp/mysql-5.7.41-linux-x86_64')
aa.FORCE = True #文件存在就覆盖
aa.parallel = 4 #默认为4
aa.extract() #解压
endtime = time.time()
print(f'COST TIME {round((endtime-starttime),1)} seconds')
解压后的文件权限来自umask, 和之前的权限可能不一样. 需要注意下, 当然你也可以把权限信息也记录上去.
加密的话, 直接初始化的时候指定密码就行
import time
import ddcw_tar
starttime = time.time()
aa = ddcw_tar.archce('/tmp/t20230419.ddcw.tar','/tmp/mysql-5.7.41-linux-x86_64','123456')
aa.FORCE = True #文件存在就覆盖
aa.archive() #压缩
endtime = time.time()
print(f'COST TIME {round((endtime-starttime),1)} seconds')
试下解压
import time
import ddcw_tar
starttime = time.time()
aa = ddcw_tar.archce('/tmp/t20230419.ddcw.tar','/tmp/mysql-5.7.41-linux-x86_64','123456')
aa.FORCE = True #文件存在就覆盖
aa.extract() #压缩
endtime = time.time()
print(f'COST TIME {round((endtime-starttime),1)} seconds')
import time
import ddcw_tar
starttime = time.time()
aa = ddcw_tar.archce('/tmp/t20230419.ddcw.tar','/tmp/mysql-5.7.41-linux-x86_64')
aa.FORCE = True #文件存在就覆盖
aa.parallel = 16
aa.archive() #压缩
endtime = time.time()
print(f'COST TIME {round((endtime-starttime),1)} seconds')
解压呢
import time
import ddcw_tar
starttime = time.time()
aa = ddcw_tar.archce('/tmp/t20230419.ddcw.tar','/tmp/mysql-5.7.41-linux-x86_64')
aa.FORCE = True #文件存在就覆盖
aa.parallel = 16
aa.extract() #压缩
endtime = time.time()
print(f'COST TIME {round((endtime-starttime),1)} seconds')
import time
import ddcw_tar
starttime = time.time()
aa = ddcw_tar.archce('/tmp/t20230419.ddcw.tar','/tmp/mysql-5.7.41-linux-x86_64','123456')
aa.FORCE = True #文件存在就覆盖
aa.parallel = 16
aa.archive() #压缩
endtime = time.time()
print(f'COST TIME {round((endtime-starttime),1)} seconds')
解压
import time
import ddcw_tar
starttime = time.time()
aa = ddcw_tar.archce('/tmp/t20230419.ddcw.tar','/tmp/mysql-5.7.41-linux-x86_64','123456')
aa.FORCE = True #文件存在就覆盖
aa.parallel = 16
aa.extract() #压缩
endtime = time.time()
print(f'COST TIME {round((endtime-starttime),1)} seconds')
time tar -zcf t20230419.tar.gz mysql-5.7.41-linux-x86_64
试下解压呢
time tar -xf t20230419.tar.gz
原始文件1.4GB
tar -zcf (-z就是使用的gzip)压缩后为 383MB
自制压缩(其实使用的是zlib) 压缩后为383MB
当然python可以使用gzip压缩. 修改如下代码即可
显然自制的ddcw_tar效果更好, 但是没有记录权限又是一个败笔....
对象 | 并发 | 加密 | 压缩时间(秒) | 解压时间(秒) |
---|---|---|---|---|
tar -z | 1 | 否 | 59.0 | 8.4 |
ddcw_tar | 4 | 否 | 21.6 | 4.0 |
ddcw_tar | 4 | 是 | 53.2 | 32.7 |
ddcw_tar | 16 | 否 | 20.3 | 6.4 |
ddcw_tar | 16 | 是 | 49.4 | 37.8 |
1. 目标文件大小差不多的时候, 并发的优势明显, 目标文件仅个别大文件的时候, 并发优势就不那么明显(小文件跑完了, 就差大文件了....)
2. ddcw_tar虽然是自制的工具, 又并发, 但是没有记录权限, 这是需要完善的地方.
3. 解压的时候, 并发数增加了, 但是解压时间也增加了一丢丢, 所以并不是并发数量越多越好.(IO也是瓶颈)
要记录权限之类的,请自己实现(也可以把文件大小换成权限, 反正文件大小我记录了,也没有使用....)
#归档 压缩 加密
import struct
from threading import Thread
import os
import zlib
import json
def get_dirs_files(dirname,followlinks=False):#followlinks:True 递归访问符号连接
dirs = []
files = []
for rootname,dirlist,filelist in os.walk(dirname):
dirs += [ os.path.join(rootname,name) for name in dirlist ]
files += [ os.path.join(rootname,name) for name in filelist ]
dirs = set(dirs)
files = set(files)
return list(dirs),list(files),[ os.path.getsize(x) for x in files ]
#list/set 转二进制对象(str就2字节表示大小, int就统统8字节)
def list_to_bin(data,isstr=True):
bdata = b''
if isstr:
for x in data:
bdata += struct.pack('<H',len(x))+x.encode()
else:
for x in data:
bdata += struct.pack('<Q',x)
return bdata
def bin_to_list(bdata,isstr=True):
data = []
lbdata = len(bdata)
i = 0
if isstr:
while i < lbdata:
dl = struct.unpack('<H',bdata[i:i+2])[0]
i += 2
data.append(bdata[i:i+dl].decode())
i += dl
else:
formatpack = f'<{int(lbdata/8)}Q'
data = struct.unpack(formatpack,bdata)
return data
#加密: 可以使用之前写的加密工具 https://cloud.tencent.com/developer/article/2256534
def encrypt(bdata,password):
bdata = bytearray(bdata)
password = bytearray(password)
lbdata = len(bdata)
lpassword = len(password)
for x in range(lbdata):
bdata[x] ^= password[x%lpassword]
return bdata
#这里我就偷懒了, 直接用xor -_- ..
def decrypt(bdata,password):
return encrypt(bdata,password)
#压缩
def compress(bdata):
return zlib.compress(bdata)
#解压
def uncompress(bdata):
return zlib.decompress(bdata)
class archce(object):
def __init__(self,filename,target,password=None):
"""
parameter 0 压缩后的文件名
parameter 1 要压缩或者解压的目录(多个目录就使用list/set)
parameter 2 加密/解密的密码
每个文件拆分成N个block(加密压缩)后存储
HEADER: header_size:4byte total_size:8byte block_size:4byte crc32:1byte encryption:1byte fast_extract:8byte file_dir:obj
BODY: body_size:8byte block: datasize:4byte(不含fileid和blockid) fileid:4byte blockid:4byte data #block_id*block_size = offset
FOOTER: [(fileid,(offset))] #快速恢复
"""
self.filename = filename
self.target = target
self.password = password
#self.encryption = False #默认不使用加密
self.encryption = True if password is not None else False #有密码就加密
self.block_size = 256*1024*1024 #默认每个块256MB 最大支持4GB(32bit)
self.crc32 = False #懒得整crc校验了....
self.compress = True #默认启用压缩
self.parallel = 4 #并发
self.fast_extract = True #快速解压, 就是在文件末尾存储 文件和相关的位置....
self.FORCE = False
def get_files(self):
dirs,files,filesize = [],[],[]
if isinstance(self.target,list) or isinstance(self.target,tuple):
for name in self.target:
if os.path.exists(name):
if os.path.isfile(name):
files += [name]
continue
if os.path.isdir(name):
dirs += [name]
a,b,c = get_dirs_files(name)
dirs += a
files += b
filesize += c
else:
if self.FORCE:
print(f'{name} dose not exists. and will continue')
else:
print(f'{name} dose not exists.')
exit(2)
dirs,files,filesize = list(set(dirs)), list(set(files)), list(set(filesize))
else:
dirs,files,filesize = get_dirs_files(self.target)
return dirs,files,filesize
def archive(self): #归档
#header
dirs,files,filesize = self.get_files()
self.file_list = files
if os.path.exists(self.filename) and not self.FORCE:
return f'{self.filename} exist.'
total_size = sum(filesize)
block_size = self.block_size
compr = 1 if self.compress else 0
encryption = 1 if self.encryption else 0
fast_extract = 0
bdirs = list_to_bin(dirs)
bfiles = list_to_bin(files)
bfilesize = list_to_bin(filesize,False)
if compr == 1:
bdirs = compress(bdirs)
bfiles = compress(bfiles)
bfilesize = compress(bfilesize)
if encryption == 1:
bdirs = encrypt(bdirs,str(self.password).encode())
bfiles = encrypt(bfiles,str(self.password).encode())
bfilesize = encrypt(bfilesize,str(self.password).encode())
header = struct.pack('<QLBBQLLLQ',total_size,block_size,compr,encryption,fast_extract,len(bdirs),len(bfiles),len(bfilesize),0) #留了8字节来记录body size
header += bdirs + bfiles + bfilesize
header = struct.pack('<L',len(header)) + header
with open(self.filename,'wb') as f:
f.write(header)
_tmp_files = [ (x,files[x]) for x in range(len(files)) ]
#f = open(self.filename,'ab')
pc = {}
for x in range(self.parallel):
pc[x] = Thread(target=self.work0,args=(x,self.filename,block_size,compr,encryption,_tmp_files))
for x in range(self.parallel):
pc[x].start()
for x in range(self.parallel):
pc[x].join()
#print('complete')
#f.close()
total_file_size = os.path.getsize(self.filename)
if self.fast_extract:
footer = {}
with open(self.filename,'rb') as f:
header_size = struct.unpack('<L',f.read(4))[0]
header = f.read(header_size)
#print(header_size)
while True:
_tdata = f.read(12)
if _tdata == b'':
break
filesize,fileid,blockid = struct.unpack('<LLL',_tdata)
if fileid not in footer:
footer[fileid] = []
footer[fileid].append((blockid,f.tell(),filesize))
f.seek(filesize,1)
if f.tell() == total_file_size:
break
f = os.open(self.filename, os.O_WRONLY|os.O_CREAT)
os.lseek(f, 18, 0)
os.write(f,struct.pack('<Q',total_file_size))
os.fsync(f)
os.close(f)
footer = json.dumps(footer).encode()
with open(self.filename,'ab') as f:
f.write(footer)
print('write footer complete')
#return footer,total_file_size
def work0(self,x,filename,block_size,compr,encryption,_tmp_files):
f = open(filename,'ab')
while True:
try:
_fileid,_filename = _tmp_files.pop()
print(f'Process {x} archive file {_filename}')
except Exception as e:
#print(e)
break
_tf = open(_filename,'rb')
_block_id = 0 #block_id
while True:
_bdata = _tf.read(block_size)
if _bdata == b'' and _block_id != 0: #空文件也记录下
break
if compr == 1:
_bdata = compress(_bdata)
if encryption == 1:
_bdata = encrypt(_bdata,str(self.password).encode())
_lbdata = len(_bdata)
_bdata = struct.pack('<LLL',_lbdata, _fileid, _block_id,) + _bdata
status = f.write(_bdata)
#print(f'{x} {_block_id} {status} wirte OK')
_block_id += 1
_tf.close()
f.close()
def extract(self):
if not isinstance(self.target,str):
return f'{self.target} must be str'
total_size,block_size,compr,encryption,dirs,files,filesize,_footer = self.file_header()
self.file_list = files
_footer = [ [x,_footer[x]] for x in _footer ]
if encryption == 1 and self.password is None:
return False
for x in dirs:
print(f'create dir {x}')
#os.makedirs(x,exist_ok=self.FORCE)
os.makedirs(x,exist_ok=True)
pc = {}
for x in range(self.parallel):
pc[x] = Thread(target=self.work1,args=(x,_footer,block_size,compr,encryption,)) #filename: files[_footer[n][0]] offset:files[_footer[n][1]]
for x in range(self.parallel):
pc[x].start()
for x in range(self.parallel):
pc[x].join()
def work1(self,x,_footer,block_size,compr,encryption,):
_f = open(self.filename,'rb')
while True:
try:
fileid,file_detail = _footer.pop()
filename = self.file_list[int(fileid)]
print(f'write file {filename}')
except:
return
with open(filename,'wb') as f:
for x in file_detail:
loffset = x[0]*block_size
_offset = x[1]
_filesize = x[2]
_f.seek(_offset,0)
bdata = _f.read(_filesize)
if encryption == 1:
bdata = decrypt(bdata,str(self.password).encode())
if compr == 1:
bdata = uncompress(bdata)
f.seek(loffset,0)
f.write(bdata)
_f.close()
def file_header(self):
if not os.path.exists(self.filename):
return f'no file {self.filename}'
with open(self.filename,'rb') as f:
header_size = struct.unpack('<L',f.read(4))[0]
total_size,block_size,compr,encryption,fast_extract,lbdirs,lbfiles,lbfilesize,bodysize = struct.unpack('<QLBBQLLLQ',f.read(8+4+1+1+8+4+4+4+8))
dirs = f.read(lbdirs)
files = f.read(lbfiles)
filesize = f.read(lbfilesize)
if encryption == 1:
dirs = decrypt(dirs,str(self.password).encode())
files = decrypt(files,str(self.password).encode())
filesize = decrypt(filesize,str(self.password).encode())
if compr == 1:
dirs = uncompress(dirs)
files = uncompress(files)
filesize = uncompress(filesize)
dirs = bin_to_list(dirs)
files = bin_to_list(files)
filesize = bin_to_list(filesize,False)
if fast_extract > 0:
f.seek(fast_extract,0)
_footer = json.loads(f.read().decode())
else:
_footer = None
return total_size,block_size,compr,encryption,dirs,files,filesize,_footer
为啥写这篇文章呢,主要是我最近比较浮躁, 写写python静静心 -_-
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。