首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用PYTHON制作简单的 读写分离中间件

使用PYTHON制作简单的 读写分离中间件

原创
作者头像
大大刺猬
发布2023-03-20 22:01:12
1.2K0
发布2023-03-20 22:01:12
举报
文章被收录于专栏:大大刺猬大大刺猬

如果你看了前两章(连接协议解析,执行查询解析)的话, 而你又有点编程基础的话, 你应该就能制作一个简单的读写分离中间件了.

恰好我都会点点, 那就制作一个简单的读写分离中间件吧.

原理

由于还不会lex, 所以就在sql里面添加hint来提示我们中间件 这条sql需要读分离. 暂不考虑事务情况. 就假设主从数据是完全一致的

大概流程如下, 就是自己监控一个端口, 当client连接上来的时候, 默认转发给 MYSQL RW(可读可写), 如果匹配到关键字, 比如:/*ddcw_read*/后, 就转发到MYSQL RO (只读,一般为从库)

是不是很简单.....
是不是很简单.....

设计过程

初始化服务

每个连接一个线程, 该线程再分出去两个线程, 一个监控client发来的数据, 另一个监控MYSQL RW发来的数据, 然后根据条件做转发.

完整代码见文末, 这只给伪代码演示

#MAIN
socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
accept_client_thread = Thread(target=self.accept_client,daemon=True)
accept_client_thread.start()
accept_client_thread.join()

#accept_client
conn, addr = self.socket_server.accept()
thread = Thread(target=self.handler,args=(conn,addr),daemon=True)
thread.start()

#handler
sock = socket.create_connection((self.w[0], self.w[1])) #连接后端 MYSQL RW
t1 = Thread(target=self.hashread,args=(conn,sock)) #监控client发来的包
t2 = Thread(target=self.hashread,args=(sock,conn,)) #监控mysql rw发来的包

转发数据

判断数据包类型和是否包含关键字符串

bdata.find(b'/*ddcw_read*/') #找到需要读均衡的包

mid = hash(time.time())%self.length
ts = self.ri[mid]
ts[0].sendall(bdata) #讲该包转到随机的后台MYSQL RO

data[4:5] == b'\xfe' #判断query包结束

有个小插曲, 我解析server发来的包, 和上一篇解析的完全一样, 但是转发到client就不行, client收到empty set, 然后断开连接了.

后来对比发现, 正常的包差一个EOF开头, EOF结尾多了两空白字符, 我也修改为这样后就可以了....

测试

由于还是测试版本, 没得接口, 直接修改源码就是了

self.host 绑定的IP地址
self.port 绑定的端口
self.w 读写  MSYQL RW
self.r 仅读, MYSQL RO

由于使用了hint, 所以使用mysql命令的时候要加个 -c 或者 --comments

暂不支持ssl 所以也要 --skip-ssl

查询下server id, 发现每次查询(hash(time.time()))的不一样, 说明读分离成功了.

创建一张表, 然后插入两条数据, 然后去从库删掉一掉数据, 再使用/*ddcw_read*/查询

发现数据是在 1条 和2条之间切换, 说明读写分离成功了的.

总结

mysql的读写分离中间件还是比较多的, 不过都并不是那么好用, 比如官方的mysql-router, 要使用端口来区分. 就不是那么滴方便. 自己写,虽然也能实现简单的读写分离, 但是功能差得太多.

不过写着玩还是不错的, 能增长见识. (会lex后就能做分布式了)

附源码

testpymysql.py 见上一章文末或者github

本次实验的 mysql_rw.py 见github 或者如下

就是根据上上章的mysql_joker改编的

import struct
from threading import Thread
import socket
import time
import testpymysql

def btoint(bdata,t='little'):
        return int.from_bytes(bdata,t)

def read_pack(rf):
	pack_header = rf.read(4)
	if len(pack_header) < 4:
		print(pack_header,' bye!')
		exit(2)
	btrl, btrh, packet_seq = struct.unpack("<HBB", pack_header)
	pack_size = btrl + (btrh << 16)
	bdata = rf.read(pack_size)
	if bdata.find(b'/*ddcw_read*/') == -1:
		return pack_header+bdata,False
	else:
		return pack_header+bdata,True

class mrw(object): 
	def __init__(self):
		self.host = '0.0.0.0'
		self.port = 3306
		self.w = ('192.168.101.21',3308,)
		self.r = (('192.168.101.21',3308,'root','123456'), ('192.168.101.19',3306,'root','123456'))
		self.length = len(self.r)
		self.ri = []
		for x in range(self.length):
			aa = testpymysql.mysql()
			aa.host = self.r[x][0]
			aa.port = self.r[x][1]
			aa.user = self.r[x][2]
			aa.password = self.r[x][3]
			aa.connect()
			self.ri.append([aa.sock,aa.rf])
			
	def hashread(self,client_sock,server_sock,): 
		rf = client_sock.makefile('rb')
		while True:
			bdata,status = read_pack(rf)
			#print('seq:',btoint(bdata[3:4]),bdata[4:5],bdata)
			if status:
				mid = hash(time.time())%self.length
				ts = self.ri[mid]
				ts[0].sendall(bdata)
				eof = 0
				tdata = b''
				seq = 1
				while eof <2:
					data,status = read_pack(ts[1])
					if data[4:5] == b'\xfe':
						eof += 1
						if eof == 1:
							continue
					data = bytearray(data)
					data[3:4] = struct.pack('<B',seq)
					if eof == 2:
						data[0:3] = b'\x07\x00\x00'
						data += b'\x00\x00'
					#print('seq:',btoint(data[3:4]),data[4:5],data)
					client_sock.sendall(data)
					seq += 1
			else:
				server_sock.sendall(bdata)

	def handler(self,conn,addr):
		#连接SERVER
		sock = socket.create_connection((self.w[0], self.w[1]))
		sock.settimeout(None)
		
		#1个监控conn发来的数据,然后转发, 一个监控server发来的数据, 然后转发
		t1 = Thread(target=self.hashread,args=(conn,sock))
		t2 = Thread(target=self.hashread,args=(sock,conn,))
		t1.start()
		t2.start()
		t1.join()
		t2.join()

	def init(self):
		socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
		socket_server.bind((self.host, self.port))
		socket_server.listen(151)
		self.socket_server = socket_server

		accept_client_thread = Thread(target=self.accept_client,daemon=True)
		accept_client_thread.start()
		accept_client_thread.join()
		
	def accept_client(self,):
		while True:
			conn, addr = self.socket_server.accept()
			thread = Thread(target=self.handler,args=(conn,addr),daemon=True)
			thread.start()
	
aa = mrw()
aa.init()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 原理
  • 设计过程
    • 初始化服务
      • 转发数据
      • 测试
      • 总结
      • 附源码
      相关产品与服务
      云数据库 MySQL
      腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档