前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MYSQL连接协议解析 并使用PYTHON模拟连接

MYSQL连接协议解析 并使用PYTHON模拟连接

原创
作者头像
大大刺猬
修改2023-03-19 21:07:15
1.6K0
修改2023-03-19 21:07:15
举报
文章被收录于专栏:大大刺猬大大刺猬

本文主要讲mysql连接协议.

了解了mysql的连接协议后, 就可以直接写mysql连接(驱动)了, 就可以模拟mysql client去连接数据库了, 还能模拟mysql服务端, 就可以制作mysql中间件来做读写分离, 分布式数据库 之类的了. 不过本文不会讲到那么多.

COM_QUERY下次讲.

读前须知:

本文使用的密码加密策略为 NativePassword

本文不使用SSL

mysql包格式如下

名字

大小(字节)

描述

payload_length

3

包长度(2**(3*8)) 所以最大就是16MB (0xFFFFFF表示包超过16MB, 继续读)

sequence_id

1

序列号(0-255)

payload

具体的包了

MYSQL连接过程

过程如下:

client 连接 server (socket.connect())

server 发送握手协议(包括数据库版本, 加密策略,capability_flags, salt等信息) (HandshakeV10)

client 回复账号密码等信息 (HandshakeResponse41)

server回复 OK/ERR 包, 如果是err包, 客户端和服务端就会断开连接. OK包就进入命令解析阶段(下章讲)

连接详情(含py)

connect

客户端直接建立socket连接即可. (本文不含本地socket, 均走TCP)

执行如下py代码连接mysql后, 服务端就会发送handshake包(扫描服务器版本就可以使用这种方法,这一步不要账号密码)

代码语言:python
复制
import socket
sock = socket.create_connection(('192.168.101.21',3308)) #比socket.scoket好用些
sock.settimeout(None)
rf = sock.makefile("rb") #把网络映射为IO, 方便写, 也可以不要. 就使用recv/send
seq_id = 0 #包的序号

HandshakeV10

本文解析的handshake为v10版本.

格式如下

名字

大小(字节)

条件

描述

protocol version

1

v10版本固定为10 (b'\n')

server version

以0x00结尾

mysql版本,字符串

thread id

4

server端对应的thread id

salt

8

加密需要的(scramble)

filler

1

固定 0x00

capability_flags_1

2

功能标记,比如带不带DB

character_set

1

字符集

status_flags

2

server状态,比如是否自动提交

capability_flags_2

2

类似capability_flags_1

auth_plugin_data_len

1

capabilities & CLIENT_PLUGIN_AUTH

salt大小

reserved

10

固定10个空字符(填充)

auth-plugin-data-part-2

MAX(13, length of auth-plugin-data - 8)

salt的剩余部分(毕竟上面只有8字节, 一般是20字节)

auth_plugin_name

剩下的字节

密码插件名字

salt

mysql的NativePassword加密策略虽然是做两次 sha1 就行, 但是网络传说中不安全(会被截获), 所以服务端每次都会生成随机的salt给密码加密. 就是加盐...

注意:每次连接的salt都不一样

capability_flags

客户端服务端通用的, 固定4(2+2)字节(32bit) , 每个bit位代表一个, 比如第九位代表CLIENT_PROTOCOL_41

本文客户端使用CAPABILITIES = (

LONG_PASSWORD

| LONG_FLAG

| PROTOCOL_41

| TRANSACTIONS

| SECURE_CONNECTION

| MULTI_RESULTS

| PLUGIN_AUTH

| PLUGIN_AUTH_LENENC_CLIENT_DATA

| CONNECT_ATTRS

)

即3842565, 换成bit就是如下

代码语言:python
复制
#[ 1 if 3842565 & ( 1 << x ) else 0 for x in range(32) ]
[1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

character_set

就是字符编号, 可以在mysql服务端查询编号, 比如45表示utf8mb4

代码语言:sql
复制
select id, character_set_name, collation_name, is_default from information_schema.collations order by id;

auth_plugin_name

就是加密插件名字, 本文使用mysql_native_password

HandshakeResponse41

本文解析的为HandshakeResponse41

格式如下

名字

大小(字节)

条件

描述

client_flag

4

capability_flags 同server

max_packet_size

4

最大包大小, 默认16MB

character_set

1

字符集

filler

23

填充字符

username

以0x00结尾

用户名

auth_response

取决于密码长度

capabilities & CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA

密码长度(使用变成类型)和密码, 使用sha1加盐

dbname

0x00

capabilities & CLIENT_CONNECT_WITH_DB(就是capabilities 中的DB位是否为1, 就是有没有设置DB的意思)

数据库名(本文不含)

client_plugin_name

0x00

capabilities & CLIENT_PLUGIN_AUTH

加密插件名字(以空0x00字符结尾)

attrs

变长长度

CLIENT_CONNECT_ATTRS

客户端名字,版本,pid信息

auth_response

密码长度+密码

密码长度使用变长类型, 可参考上一章

密码是加密的, 可使用如下函数加密, 也可以使用官方的c代码(sql/auth/password.c::scramble)

代码语言:python
复制
#参考的pymysql
#之前的巡检脚本也有关于native_password加密的, sql参考:select CONCAT('*', UPPER(SHA1(UNHEX(SHA1('password')))));
import hashlib
def native_password(password,salt):
	stage1 = hashlib.sha1(password).digest()
	stage2 = hashlib.sha1(stage1).digest()
	
	rp = hashlib.sha1(salt)
	rp.update(stage2)
	result = bytearray(rp.digest())
	
	for x in range(len(result)):
		result[x] ^= stage1[x]
	return result

capabilities

比如你要设置 数据库的话, 就

self.server_capabilities & MYSQL_CONNECT_WITH_DB #与MYSQL_CONNECT_WITH_DB (1 << 3)相与就是设置第N位为1

OK/ERR

Ok包就是 第一字节为 0x00

err包就是 第一字节为 0xfe

验证查看

上面已经解析了mysql的连接过程了, 这里就使用python连接看看

bytes([self._next_seq_id]) 写成了bytes(self._next_seq_id) 坑了我2小时...... 一致报Got packets out of order.....

代码语言:javascript
复制
import testpymysql
aa = testpymysql.mysql()
aa.connect()

显示没问题, 去服务端瞧瞧, 也没得问题, 信息都是对得上的, 说明我们解析mysql连接协议成功了. 下章在讲发送SQL命令

总结

1. mysql包 分为header(3+1)和payload

2. 当连上mysql的时候, mysql就会发送它的版本信息和salt过来

3. 客户端根据salt把密码加密发送过去, 如果成功就返回OK包, 失败就返回err包

代码语言:javascript
复制
Client ->> Server : connect()
Server -->> Client : Handshake
Client ->> Server : HandshakeResponse
Server -->> Client : OK/ERR pack

mysql的字符常量均为 0x00结尾. 可变长字符 均为: 变长长度+值.

参考资料

mysql官方: https://dev.mysql.com/doc/dev/mysql-server/latest/

pymsyql: https://github.com/PyMySQL/PyMySQL

一个有趣的实验

上面已经解析了mysql的连接了, 那么我们就可以模拟mysql服务端了

测试代码链接: https://github.com/ddcw/ddcw/blob/master/python/mysql_joker.py

编译为二进制更方便整人.... (不建议用哈).
编译为二进制更方便整人.... (不建议用哈).

然后客户端使用mysql连接测试, 就出现了password is not exists. will drop all database.

JK
JK

附完整源码

代码语言:python
复制
import hashlib
import socket
import struct
import os


#来自pymysql 变长字段
def _lenenc_int(i):
	if i < 0:
		raise ValueError("Encoding %d is less than 0 - no representation in LengthEncodedInteger" % i)
	elif i < 0xFB:
		return bytes([i])
	elif i < (1 << 16):
		return b"\xfc" + struct.pack("<H", i)
	elif i < (1 << 24):
		return b"\xfd" + struct.pack("<I", i)[:3]
	elif i < (1 << 64):
		return b"\xfe" + struct.pack("<Q", i)
	else:
		raise ValueError("Encoding %x is larger than %x - no representation in LengthEncodedInteger"% (i, (1 << 64)))

def native_password(password,salt):
	stage1 = hashlib.sha1(password).digest()
	stage2 = hashlib.sha1(stage1).digest()
	
	rp = hashlib.sha1(salt)
	rp.update(stage2)
	result = bytearray(rp.digest())
	
	for x in range(len(result)):
		result[x] ^= stage1[x] #异或, 不一样则为1
	return result


def btoint(bdata,t='little'):
        return int.from_bytes(bdata,t)

class mysql(object):
	def __init__(self):
		self.host = '192.168.101.21'
		self.port = 3308
		self.user = 'root'
		self.password = '123456'

	def read_pack(self,):
		pack_header = self.rf.read(4)
		btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header)
		pack_size = btrl + (btrh << 16)
		self._next_seq_id = (self._next_seq_id + 1) % 256 
		bdata = self.rf.read(pack_size) #也懒得考虑超过16MB的包了
		return bdata

	def write_pack(self,data):
		#3字节长度, 1字节seq, data
		bdata = struct.pack("<I", len(data))[:3] + bytes([self._next_seq_id]) + data
		self.sock.sendall(bdata)
		self._next_seq_id = (self._next_seq_id + 1) % 256

	def handshake(self,bdata):
		i = 0 #已经读取的字节数, 解析binlog的时候也是这么用的.....
		protocol_version = bdata[:1] #只解析10

		server_end = bdata.find(b"\0", i)
		self.server_version = bdata[i:server_end]
		i = server_end + 1

		self.thread_id = btoint(bdata[i:i+4])
		i += 4

		self.salt = bdata[i:i+8]
		i += 9 #还有1字节的filter, 没啥意义,就不保存了

		self.server_capabilities = btoint(bdata[i:i+2])
		i += 2

		self.server_charset = btoint(bdata[i:i+1])
		i += 1

		self.server_status = btoint(bdata[i:i+2])
		i += 2
		
		self.server_capabilities |= btoint(bdata[i:i+2]) << 16 #往左移16位 为啥不把capability_flags_1和capability_flags_2和一起呢
		i += 2

		salt_length = struct.unpack('<B',bdata[i:i+1])[0] #懒得去判断capabilities & CLIENT_PLUGIN_AUTH了
		salt_length = max(13,salt_length-8) #前面已经有8字节了
		i += 1

		i += 10 #reserved

		self.salt += bdata[i:i+salt_length]
		i += salt_length

		self.server_plugname = bdata[i:]

	def HandshakeResponse41(self,):
		client_flag = 3842565 #不含DBname   
		#client_flag |= 1 << 3

		charset_id = 45 #45:utf8mb4  33:utf8

		#bdata = client_flag.to_bytes(4,'little') #其实应该最后在加, 毕竟还要判断很多参数, 可能还需要修改, 但是懒
		bdata = struct.pack('<iIB23s',client_flag,2**24-1,charset_id,b'')

		bdata += self.user.encode() + b'\0'
		
		auth_password = native_password(self.password.encode(), self.salt[:20])
		auth_response = _lenenc_int(len(auth_password)) + auth_password 
		bdata += auth_response

		bdata += b"mysql_native_password" + b'\0'

		#本文有设置连接属性, 主要是为了方便观察
		attr = {'_client_name':'ddcw_for_pymysql', '_pid':str(os.getpid()), "_client_version":'0.0.1',}
		#key长度+k+v长度+v
		connect_attrs = b""
		for k, v in attr.items():
			k = k.encode()
			connect_attrs += _lenenc_int(len(k)) + k
			v = v.encode()
			connect_attrs += _lenenc_int(len(v)) + v
		bdata += _lenenc_int(len(connect_attrs)) + connect_attrs
		self.write_pack(bdata)
			
		auth_pack = self.read_pack() #看看是否连接成功
		if auth_pack[:1] == b'\0':
			print('OK',)
		else:
			print('FAILED',auth_pack)
		
		

	def connect(self):
		sock = socket.create_connection((self.host, self.port))
		sock.settimeout(None)
		self.sock = sock
		self.rf = sock.makefile("rb")
		self._next_seq_id = 0

		#解析server的握手包
		bdata = self.read_pack()
		self.handshake(bdata)

		#握手.发账号密码
		self.HandshakeResponse41()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 读前须知:
  • MYSQL连接过程
  • 连接详情(含py)
    • connect
      • HandshakeV10
        • 格式如下
        • salt
        • capability_flags
        • character_set
        • auth_plugin_name
      • HandshakeResponse41
        • 格式如下
        • auth_response
        • capabilities
      • OK/ERR
      • 验证查看
      • 总结
      • 一个有趣的实验
      • 附完整源码
      相关产品与服务
      云数据库 MySQL
      腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档