兄弟们,今天来聊聊Python中数据库操作的那些事。作为后端开发,数据库操作可以说是吃饭的家伙,写了这么多年代码,总结出不少实战经验,今天和大家分享一下。
先来看MySQL的连接池实现。单个数据库连接在并发场景下显然是不够用的,而频繁创建销毁连接又会带来性能开销,这时候就需要连接池来解决问题:
import pymysql
from DBUtils.PooledDB import PooledDB
import threading
class MySQLPool:
_instance_lock = threading.Lock()
_pool = None
def __new__(cls):
with cls._instance_lock:
if not hasattr(cls, '_instance'):
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
if not self._pool:
self._pool = PooledDB(
creator=pymysql,
maxconnections=10, # 最大连接数
mincached=2, # 初始化时,最小的空闲连接数
maxcached=5, # 最大的空闲连接数
maxshared=3, # 最大共享连接数
blocking=True, # 连接数达到最大时是否阻塞
maxusage=None, # 一个连接最多被重复使用的次数
host='localhost',
port=3306,
user='root',
password='password',
database='test',
charset='utf8mb4'
)
def get_conn(self):
return self._pool.connection()
有了连接池,咱们来封装一个数据库操作类,支持事务和上下文管理:
class DBManager:
def __init__(self):
self.pool = MySQLPool()
self.conn = None
self.cursor = None
def __enter__(self):
self.conn = self.pool.get_conn()
self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self.conn.rollback()
else:
self.conn.commit()
self.cursor.close()
self.conn.close()
def execute(self, sql, params=None):
return self.cursor.execute(sql, params)
def fetchall(self):
return self.cursor.fetchall()
实际使用示例:
def add_user(username, email):
with DBManager() as db:
sql = "INSERT INTO users (username, email) VALUES (%s, %s)"
db.execute(sql, (username, email))
def get_user(user_id):
with DBManager() as db:
sql = "SELECT * FROM users WHERE id = %s"
db.execute(sql, (user_id,))
return db.fetchall()
说到数据库操作,就不得不提到Redis缓存。在高并发系统中,合理使用Redis可以大幅提升系统性能:
import redis
from functools import wraps
import json
class RedisClient:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
# 创建连接池
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=100
)
cls._instance.redis = redis.Redis(connection_pool=pool)
return cls._instance
def cache_decorator(expire_time=300):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存key
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
redis_client = RedisClient().redis
# 尝试从缓存获取
cached_result = redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# 缓存未命中,执行函数
result = func(*args, **kwargs)
# 存入缓存
redis_client.setex(
cache_key,
expire_time,
json.dumps(result)
)
return result
return wrapper
return decorator
来个实际应用场景,结合MySQL和Redis实现一个用户系统:
class UserService:
def __init__(self):
self.redis_client = RedisClient().redis
@cache_decorator(expire_time=3600)
def get_user_info(self, user_id):
with DBManager() as db:
sql = """
SELECT u.*, p.phone
FROM users u
LEFT JOIN user_profiles p ON u.id = p.user_id
WHERE u.id = %s
"""
db.execute(sql, (user_id,))
return db.fetchall()
def update_user_info(self, user_id, data):
with DBManager() as db:
# 更新用户信息
update_sql = "UPDATE users SET username = %s WHERE id = %s"
db.execute(update_sql, (data['username'], user_id))
# 删除缓存
cache_key = f"get_user_info:({user_id},):{{}}"
self.redis_client.delete(cache_key)
在处理高并发场景时,还需要考虑缓存击穿、缓存穿透等问题。这里实现一个带布隆过滤器的缓存装饰器:
最后说两句数据库操作的注意事项:在生产环境中,要做好异常处理和重试机制,这里有个通用的重试装饰器:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
reraise=True
)
def db_operation_with_retry():
with DBManager() as db:
# 数据库操作
pass
领取专属 10元无门槛券
私享最新 技术干货