首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Python数据库操作 | MySQL连接池 | Redis缓存应用

兄弟们,今天来聊聊Python中数据库操作的那些事。作为后端开发,数据库操作可以说是吃饭的家伙,写了这么多年代码,总结出不少实战经验,今天和大家分享一下。

先来看MySQL的连接池实现。单个数据库连接在并发场景下显然是不够用的,而频繁创建销毁连接又会带来性能开销,这时候就需要连接池来解决问题:

import pymysql

from DBUtils.PooledDB import PooledDB

import threading

class MySQLPool:

_instance_lock = threading.Lock()

_pool = None

def __new__(cls):

with cls._instance_lock:

if not hasattr(cls, '_instance'):

cls._instance = super().__new__(cls)

return cls._instance

def __init__(self):

if not self._pool:

self._pool = PooledDB(

creator=pymysql,

maxconnections=10, # 最大连接数

mincached=2, # 初始化时,最小的空闲连接数

maxcached=5, # 最大的空闲连接数

maxshared=3, # 最大共享连接数

blocking=True, # 连接数达到最大时是否阻塞

maxusage=None, # 一个连接最多被重复使用的次数

host='localhost',

port=3306,

user='root',

password='password',

database='test',

charset='utf8mb4'

)

def get_conn(self):

return self._pool.connection()

有了连接池,咱们来封装一个数据库操作类,支持事务和上下文管理:

class DBManager:

def __init__(self):

self.pool = MySQLPool()

self.conn = None

self.cursor = None

def __enter__(self):

self.conn = self.pool.get_conn()

self.cursor = self.conn.cursor(pymysql.cursors.DictCursor)

return self

def __exit__(self, exc_type, exc_val, exc_tb):

if exc_type:

self.conn.rollback()

else:

self.conn.commit()

self.cursor.close()

self.conn.close()

def execute(self, sql, params=None):

return self.cursor.execute(sql, params)

def fetchall(self):

return self.cursor.fetchall()

实际使用示例:

def add_user(username, email):

with DBManager() as db:

sql = "INSERT INTO users (username, email) VALUES (%s, %s)"

db.execute(sql, (username, email))

def get_user(user_id):

with DBManager() as db:

sql = "SELECT * FROM users WHERE id = %s"

db.execute(sql, (user_id,))

return db.fetchall()

说到数据库操作,就不得不提到Redis缓存。在高并发系统中,合理使用Redis可以大幅提升系统性能:

import redis

from functools import wraps

import json

class RedisClient:

_instance = None

def __new__(cls):

if cls._instance is None:

cls._instance = super().__new__(cls)

# 创建连接池

pool = redis.ConnectionPool(

host='localhost',

port=6379,

db=0,

max_connections=100

)

cls._instance.redis = redis.Redis(connection_pool=pool)

return cls._instance

def cache_decorator(expire_time=300):

def decorator(func):

@wraps(func)

def wrapper(*args, **kwargs):

# 生成缓存key

cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"

redis_client = RedisClient().redis

# 尝试从缓存获取

cached_result = redis_client.get(cache_key)

if cached_result:

return json.loads(cached_result)

# 缓存未命中,执行函数

result = func(*args, **kwargs)

# 存入缓存

redis_client.setex(

cache_key,

expire_time,

json.dumps(result)

)

return result

return wrapper

return decorator

来个实际应用场景,结合MySQL和Redis实现一个用户系统:

class UserService:

def __init__(self):

self.redis_client = RedisClient().redis

@cache_decorator(expire_time=3600)

def get_user_info(self, user_id):

with DBManager() as db:

sql = """

SELECT u.*, p.phone

FROM users u

LEFT JOIN user_profiles p ON u.id = p.user_id

WHERE u.id = %s

"""

db.execute(sql, (user_id,))

return db.fetchall()

def update_user_info(self, user_id, data):

with DBManager() as db:

# 更新用户信息

update_sql = "UPDATE users SET username = %s WHERE id = %s"

db.execute(update_sql, (data['username'], user_id))

# 删除缓存

cache_key = f"get_user_info:({user_id},):{{}}"

self.redis_client.delete(cache_key)

在处理高并发场景时,还需要考虑缓存击穿、缓存穿透等问题。这里实现一个带布隆过滤器的缓存装饰器:

最后说两句数据库操作的注意事项:在生产环境中,要做好异常处理和重试机制,这里有个通用的重试装饰器:

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(

stop=stop_after_attempt(3),

wait=wait_exponential(multiplier=1, min=4, max=10),

reraise=True

)

def db_operation_with_retry():

with DBManager() as db:

# 数据库操作

pass

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OVY-ivSykUZTNiaYt-eW3G7A0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券