首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >连接MYSQL后, 怎样发送SQL和接收数据? (含python)

连接MYSQL后, 怎样发送SQL和接收数据? (含python)

原创
作者头像
大大刺猬
发布2023-03-20 14:15:53
1.5K0
发布2023-03-20 14:15:53
举报
文章被收录于专栏:大大刺猬大大刺猬

如果你看了 上一章 , 那你应该就明白了Mysql连接的时候都干了啥, 但是光连上也没啥用啊, 要发送SQL,接收server发来的数据.

本文主要就讲mysql客户端服务端发送数据过程(仅COM_QUERY)

发送SQL(COM_QUERY)

发送sql比较简单, 直接就是 包头加sql就行了....

名字

大小(字节)

描述

payload_length

3

包大小

sequence_id

1

序列号,每个com都重置为0

com_query

1

0x03

sql

剩余的大小

sql

bdata = struct.pack('<IB',len(sql)+1,0x03) #I:每个com_query的seq_id都从0开始,第4字节固定为0, 所以直接用I, +1:com_query占>用1字节,  0x03:com_query
bdata += sql.encode()
self.sock.sendall(bdata) #直接send就行
self._next_seq_id = 1 #下一个包seq_id = 1

接收数据

当mysql执行完SQL后, 就会返回相关的数据

流程

完整流程如下, 本次环境不考虑特殊情况

不考虑0xFF(error) 0xFB(字段太多) 0x00(无返回数据,就是成功)

所以实际上的过程就是如下

Client ->> Server : SQL
Server ->> Client : 字段数量
Server -->> Client : 具体的字段(每个字段一个包)
Server ->> Client : EOF(warnings)
Server -->> Client : 数据(每行数据一个包)
Server ->> Client : EOF(同上)

下面具体讲讲, 字段数量包就一个字节,没啥好说的, 本文也没有考虑字段多的情况

字段包

本文使用的ColumnDefinition41, 格式如下

名字

大小(字节)或者类型

描述

catalog

变长

固定为def

schema

变长

schema name

table

变长

别名, 比如 select aa.id as sb from db1.t1 as aa 就是aa

org_table

变长

实际表名字, 上面的 t1

name

变长

列别名, 上面的sb

org_name

变长

列实际名字, 就是上面的id

length of fixed length fields

1

固定0x0c

character_set

2

字符集

column_length

4

字段值的最大长度(4GB)

type

1

字段类型(63表示二进制)

flags

2

decimals

1

基本上都是之前讲过的, 就不具体介绍了

EOF包

两个EOF包完全一样, 只是方便识别数据界限而已. 毕竟没有返回行数... 所以要EOF包来区分

Payload固定5字节

header + Payload = mysql_pack

(header: 3字节大小 加 1字节seq_id)

名字

大小

描述

header

1

固定0xFE

warnings

2

2字节warning(需要warning的话,com_query执行show warnings)

status_flags

2

数据行包

这个和binlog一样....

都是长度加数据, 然后放一堆, 长度取决于数据字段类型, 字段类型来自上面的字段包

PYTHON模拟

模拟客户端发送数据, 并解析server返回的数据

脚本见文末, 或者 https://github.com/ddcw/ddcw/blob/master/python/testpymysql.py

import testpymysql
aa = testpymysql.mysql()
aa.connect()
aa.query('select aa.id as sb,aa.name from db1.t1 as aa limit 4')
for x in aa.result():
	print(x)
print(aa.des_list)
没有做数据类型转换
没有做数据类型转换

结合上一篇的连接信息, 我们就可以做个简单的读写分离了, 毕竟客户端服务端发的包都弄清楚了. 有空了可以试试 -_-

总结

1. 客户端发送SQL很简单, 直接把com_query+SQL发送到服务器上就行

2. 服务器返回数据过程: 字段数量, 字段, EOF, 行... EOF

3. 返回的数据行和binlog存储的是一样的, 都是长度+数据放一堆

4. server返回的数据行数是由客户端统计的

5.默认不返回warning, 需要自己使用show warnings去获取

附源码

在上一版的基础上新增了query, 和result

import hashlib
import socket
import struct
import os


#来自pymysql
def _lenenc_int(i):
	if i < 0:
		raise ValueError("Encoding %d is less than 0 - no representation in LengthEncodedInteger" % i)
	elif i < 0xFB:
		return bytes([i])
	elif i < (1 << 16):
		return b"\xfc" + struct.pack("<H", i)
	elif i < (1 << 24):
		return b"\xfd" + struct.pack("<I", i)[:3]
	elif i < (1 << 64):
		return b"\xfe" + struct.pack("<Q", i)
	else:
		raise ValueError("Encoding %x is larger than %x - no representation in LengthEncodedInteger"% (i, (1 << 64)))

def native_password(password,salt):
	stage1 = hashlib.sha1(password).digest()
	stage2 = hashlib.sha1(stage1).digest()
	
	rp = hashlib.sha1(salt)
	rp.update(stage2)
	result = bytearray(rp.digest())
	
	for x in range(len(result)):
		result[x] ^= stage1[x]
	return result

def _read_lenenc(bdata,i): 
	length = btoint(bdata[i:i+1])
	i += 1
	data = bdata[i:i+length]
	i += length
	return data,i


def btoint(bdata,t='little'):
        return int.from_bytes(bdata,t)

class mysql(object):
	def __init__(self):
		self.host = '192.168.101.21'
		self.port = 3308
		self.user = 'root'
		self.password = '123456'

	def read_pack(self,):
		pack_header = self.rf.read(4)
		btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header)
		pack_size = btrl + (btrh << 16)
		self._next_seq_id = (self._next_seq_id + 1) % 256 
		bdata = self.rf.read(pack_size) #也懒得考虑超过16MB的包了
		return bdata

	def write_pack(self,data):
		#3字节长度, 1字节seq, data
		bdata = struct.pack("<I", len(data))[:3] + bytes([self._next_seq_id]) + data
		self.sock.sendall(bdata)
		self._next_seq_id = (self._next_seq_id + 1) % 256

	def handshake(self,bdata):
		i = 0 #已经读取的字节数, 解析binlog的时候也是这么用的.....
		protocol_version = bdata[:1] #只解析10

		server_end = bdata.find(b"\0", i)
		self.server_version = bdata[i:server_end]
		i = server_end + 1

		self.thread_id = btoint(bdata[i:i+4])
		i += 4

		self.salt = bdata[i:i+8]
		i += 9 #还有1字节的filter, 没啥意义,就不保存了

		self.server_capabilities = btoint(bdata[i:i+2])
		i += 2

		self.server_charset = btoint(bdata[i:i+1])
		i += 1

		self.server_status = btoint(bdata[i:i+2])
		i += 2
		
		self.server_capabilities |= btoint(bdata[i:i+2]) << 16 #往左移16位 为啥不把capability_flags_1和capability_flags_2和一起呢
		i += 2

		salt_length = struct.unpack('<B',bdata[i:i+1])[0] #懒得去判断capabilities & CLIENT_PLUGIN_AUTH了
		salt_length = max(13,salt_length-8) #前面已经有8字节了
		i += 1

		i += 10 #reserved

		self.salt += bdata[i:i+salt_length]
		i += salt_length

		self.server_plugname = bdata[i:]

	def HandshakeResponse41(self,):
		client_flag = 3842565 #不含DBname   
		#client_flag |= 1 << 3

		charset_id = 45 #45:utf8mb4  33:utf8

		#bdata = client_flag.to_bytes(4,'little') #其实应该最后在加, 毕竟还要判断很多参数, 可能还需要修改, 但是懒
		bdata = struct.pack('<iIB23s',client_flag,2**24-1,charset_id,b'')

		bdata += self.user.encode() + b'\0'
		
		auth_password = native_password(self.password.encode(), self.salt[:20])
		auth_response = _lenenc_int(len(auth_password)) + auth_password 
		bdata += auth_response

		bdata += b"mysql_native_password" + b'\0'

		#本文有设置连接属性, 主要是为了方便观察
		attr = {'_client_name':'ddcw_for_pymysql', '_pid':str(os.getpid()), "_client_version":'0.0.1',}
		#key长度+k+v长度+v
		connect_attrs = b""
		for k, v in attr.items():
			k = k.encode()
			connect_attrs += _lenenc_int(len(k)) + k
			v = v.encode()
			connect_attrs += _lenenc_int(len(v)) + v
		bdata += _lenenc_int(len(connect_attrs)) + connect_attrs
		self.write_pack(bdata)
			
		auth_pack = self.read_pack() #看看是否连接成功
		if auth_pack[:1] == b'\0':
			print('OK',)
		else:
			print('FAILED',auth_pack)
		

	def query(self,sql):
		"""不考虑SQL超过16MB情况"""
		# payload_length:3  sequence_id:1 payload:N
		# payload: com_query(0x03):1 sql:n
		bdata = struct.pack('<IB',len(sql)+1,0x03) #I:每个com_query的seq_id都从0开始,第4字节固定为0, 所以直接用I, +1:com_query占用1字节,  0x03:com_query
		bdata += sql.encode()
		self.sock.sendall(bdata)
		self._next_seq_id = 1 #下一个包seq_id = 1

	def result(self):
		#https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query_response_text_resultset_column_definition.html
		#Protocol::ColumnDefinition41
		#字段数量
		stat = self.read_pack()
		filed_count = struct.unpack('<B',stat)[0] #不考虑0xFF(error) 0xFB(字段太多) 0x00(无返回数据,就是成功)

		#字段描述(字段数据类型)
		des_list = []
		for x in range(filed_count):
			i = 0
			bdata = self.read_pack()
			catalog,i = _read_lenenc(bdata,i)
			schema,i = _read_lenenc(bdata,i)
			table,i = _read_lenenc(bdata,i)
			org_table,i = _read_lenenc(bdata,i)
			name,i = _read_lenenc(bdata,i)
			org_name,i = _read_lenenc(bdata,i)
			i += 1 #0x0c
			character_set = btoint(bdata[i:i+2])
			i += 2
			column_length = btoint(bdata[i:i+4])
			i += 4
			_type = btoint(bdata[i:i+1]) #只解析int和str, 之前解析binlog的时候还有date.... 算了
			i += 1
			flags = btoint(bdata[i:i+2])
			i += 2
			decimals = btoint(bdata[i:i+1])
			i += 1
			des_list.append([catalog,schema,table,org_table,name,org_name,character_set,column_length,_type,flags,decimals]) 
			
		self.des_list = des_list
		bdata = self.read_pack() #EOF包
		warnings = btoint(bdata[1:3])
		row = []
		while True:
			bdata = self.read_pack()
			if bdata[0:1] == b'\xfe': #EOF包
				break
			_row = []
			i = 0
			for x in des_list:
				length = btoint(bdata[i:i+1]) #不考虑长字符
				i += 1
				_row.append(bdata[i:i+length]) #懒得做数据类型转换了
			row.append(_row)
		print(f'warnings:{warnings}  rows:{len(row)}')
		return row
		
		

	def connect(self):
		sock = socket.create_connection((self.host, self.port))
		sock.settimeout(None)
		self.sock = sock
		self.rf = sock.makefile("rb")
		self._next_seq_id = 0

		#解析server的握手包
		bdata = self.read_pack()
		self.handshake(bdata)

		#握手.发账号密码
		self.HandshakeResponse41()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 发送SQL(COM_QUERY)
  • 接收数据
    • 流程
      • 字段包
        • EOF包
          • 数据行包
          • PYTHON模拟
          • 总结
          • 附源码
          相关产品与服务
          云数据库 MySQL
          腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档