最近在做项目中的耗时任务优化,将这些耗时任务接口函数放到 airflow 上,但是一些接口函数涉及到很多的数据库操作,就需要使用第三方库操作数据库 db 数据,提倡使用 ORM 操作数据库,所以就选择了这个 SQLAlchemy 这个库,用的是它的 ORM 模式。本次简单记录下用到的内容。
SQLAlchemy 是一个 Python 语言实现的的针对关系型数据库的 orm 库。可用于连接大多数常见的数据库,比如 Postges、MySQL、SQLite、Oracle等。
SQLAlchemy orm 使用步骤是需要先定义数据表结构,就是通过定义一个类,它继承自一个名为 declarative_base 的特殊基类。declarative_base 把元数据容器和映射器(用来把类映射到数据表)结合在一起。然后调用它的方法来对数据表的数据进行增删改查
orm使用的类应该满足如下四个要求:
继承自declarative_base对象
包含__tablename__,这是数据库中使用的表名
包含一个或多个属性,它们都是column对象
确保一个或多个属性组成主键
会话(session)是 SQLAlchemy ORM 和数据库交互的方式。它通过引擎包装数据库连接,并为通过会话加载或与会话关联的对象提供标识映射(identity map)。标识映射是一种类似于缓存的数据结构,它包含由对象表和主键确定的一个唯一的对象列表。会话还包装了一个事务,这个事务将一直保持打开状态,直到会话提交或回滚。
为创建会话,SQLAlchemy 提供了一个 sessionmaker 类,这个类可以确保在整个应用程序中能够使用相同的参数创建会话。sessionmaker 类通过创建一个Session类来实现这一点,Session 类是根据传递给sessionmaker工厂的参数配置的。
DB_CONN_ID = 'ld_smp_task'
conn: MySqlHook = MySqlHook(mysql_conn_id=DB_CONN_ID)
engine = conn.get_sqlalchemy_engine()
Session = sessionmaker(engine)
db_session = Session()
# 增加数据
data = DiskInfo(create_time=datetime.datetime.now(), data_source_system=data_source_system, root_dir=root_dir,
size_byte=size_bytes)
db_session.add(data)
db_session.commit()
# 更新数据
db_session.query(FileExportTask).filter_by(id=task_id).update({'start_time': start_time,
'task_status': TaskStatus.PROCESSING,
'update_time': start_time},
synchronize_session=False)
db_session.commit()
# 查看数据
db_session.query(DiskInfo).filter_by(data_source_system=data_source_system).first()
done_items = db_session.query(SceneExportTask).filter_by(parent_task_id=task_id).count()
# 关闭链接
db_session.close
每次执行一个增删改操作都要 commit 操作,最后执行 close 操作
本文分享自 pythonista的日常 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!