如果你看了前两章(连接协议解析,执行查询解析)的话, 而你又有点编程基础的话, 你应该就能制作一个简单的读写分离中间件了.
恰好我都会点点, 那就制作一个简单的读写分离中间件吧.
由于还不会lex, 所以就在sql里面添加hint来提示我们中间件 这条sql需要读分离. 暂不考虑事务情况. 就假设主从数据是完全一致的
大概流程如下, 就是自己监控一个端口, 当client连接上来的时候, 默认转发给 MYSQL RW(可读可写), 如果匹配到关键字, 比如:/*ddcw_read*/后, 就转发到MYSQL RO (只读,一般为从库)
每个连接一个线程, 该线程再分出去两个线程, 一个监控client发来的数据, 另一个监控MYSQL RW发来的数据, 然后根据条件做转发.
完整代码见文末, 这只给伪代码演示
#MAIN
socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
accept_client_thread = Thread(target=self.accept_client,daemon=True)
accept_client_thread.start()
accept_client_thread.join()
#accept_client
conn, addr = self.socket_server.accept()
thread = Thread(target=self.handler,args=(conn,addr),daemon=True)
thread.start()
#handler
sock = socket.create_connection((self.w[0], self.w[1])) #连接后端 MYSQL RW
t1 = Thread(target=self.hashread,args=(conn,sock)) #监控client发来的包
t2 = Thread(target=self.hashread,args=(sock,conn,)) #监控mysql rw发来的包
判断数据包类型和是否包含关键字符串
bdata.find(b'/*ddcw_read*/') #找到需要读均衡的包
mid = hash(time.time())%self.length
ts = self.ri[mid]
ts[0].sendall(bdata) #讲该包转到随机的后台MYSQL RO
data[4:5] == b'\xfe' #判断query包结束
有个小插曲, 我解析server发来的包, 和上一篇解析的完全一样, 但是转发到client就不行, client收到empty set, 然后断开连接了.
后来对比发现, 正常的包差一个EOF开头, EOF结尾多了两空白字符, 我也修改为这样后就可以了....
由于还是测试版本, 没得接口, 直接修改源码就是了
self.host 绑定的IP地址
self.port 绑定的端口
self.w 读写 MSYQL RW
self.r 仅读, MYSQL RO
由于使用了hint, 所以使用mysql命令的时候要加个 -c
或者 --comments
暂不支持ssl 所以也要 --skip-ssl
查询下server id, 发现每次查询(hash(time.time()))的不一样, 说明读分离成功了.
创建一张表, 然后插入两条数据, 然后去从库删掉一掉数据, 再使用/*ddcw_read*/查询
发现数据是在 1条 和2条之间切换, 说明读写分离成功了的.
mysql的读写分离中间件还是比较多的, 不过都并不是那么好用, 比如官方的mysql-router, 要使用端口来区分. 就不是那么滴方便. 自己写,虽然也能实现简单的读写分离, 但是功能差得太多.
不过写着玩还是不错的, 能增长见识. (会lex后就能做分布式了)
本次实验的 mysql_rw.py 见github 或者如下
就是根据上上章的mysql_joker改编的
import struct
from threading import Thread
import socket
import time
import testpymysql
def btoint(bdata,t='little'):
return int.from_bytes(bdata,t)
def read_pack(rf):
pack_header = rf.read(4)
if len(pack_header) < 4:
print(pack_header,' bye!')
exit(2)
btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header)
pack_size = btrl + (btrh << 16)
bdata = rf.read(pack_size)
if bdata.find(b'/*ddcw_read*/') == -1:
return pack_header+bdata,False
else:
return pack_header+bdata,True
class mrw(object):
def __init__(self):
self.host = '0.0.0.0'
self.port = 3306
self.w = ('192.168.101.21',3308,)
self.r = (('192.168.101.21',3308,'root','123456'), ('192.168.101.19',3306,'root','123456'))
self.length = len(self.r)
self.ri = []
for x in range(self.length):
aa = testpymysql.mysql()
aa.host = self.r[x][0]
aa.port = self.r[x][1]
aa.user = self.r[x][2]
aa.password = self.r[x][3]
aa.connect()
self.ri.append([aa.sock,aa.rf])
def hashread(self,client_sock,server_sock,):
rf = client_sock.makefile('rb')
while True:
bdata,status = read_pack(rf)
#print('seq:',btoint(bdata[3:4]),bdata[4:5],bdata)
if status:
mid = hash(time.time())%self.length
ts = self.ri[mid]
ts[0].sendall(bdata)
eof = 0
tdata = b''
seq = 1
while eof <2:
data,status = read_pack(ts[1])
if data[4:5] == b'\xfe':
eof += 1
if eof == 1:
continue
data = bytearray(data)
data[3:4] = struct.pack('<B',seq)
if eof == 2:
data[0:3] = b'\x07\x00\x00'
data += b'\x00\x00'
#print('seq:',btoint(data[3:4]),data[4:5],data)
client_sock.sendall(data)
seq += 1
else:
server_sock.sendall(bdata)
def handler(self,conn,addr):
#连接SERVER
sock = socket.create_connection((self.w[0], self.w[1]))
sock.settimeout(None)
#1个监控conn发来的数据,然后转发, 一个监控server发来的数据, 然后转发
t1 = Thread(target=self.hashread,args=(conn,sock))
t2 = Thread(target=self.hashread,args=(sock,conn,))
t1.start()
t2.start()
t1.join()
t2.join()
def init(self):
socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket_server.bind((self.host, self.port))
socket_server.listen(151)
self.socket_server = socket_server
accept_client_thread = Thread(target=self.accept_client,daemon=True)
accept_client_thread.start()
accept_client_thread.join()
def accept_client(self,):
while True:
conn, addr = self.socket_server.accept()
thread = Thread(target=self.handler,args=(conn,addr),daemon=True)
thread.start()
aa = mrw()
aa.init()
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。