异步操作SQLAlchemy

这段Python代码使用了异步编程和SQLAlchemy ORM(对象关系映射)库来与数据库进行交互。代码的主要功能包括数据表的创建、数据查询以及数据插入。下面是对代码的详细解释:

  1. 异步函数定义:

    async def main():

    定义了一个异步函数main,这是异步编程的基础,允许函数在等待I/O操作(如数据库查询)完成时释放控制权。

  2. 异步数据迁移:

    async with db.engine.begin() as conn: await conn.run_sync(db.Model.metadata.create_all)

    这部分代码使用了一个异步上下文管理器db.engine.begin()来开始一个数据库事务。在这个事务中,它调用了create_all方法来创建当前程序中所有模型对应的数据表,如果这些表还不存在的话。run_sync方法用于在异步上下文中同步执行SQLAlchemy的create_all函数。

  3. 开启会话并进行查询和插入:

    async with db.async_session() as session: async with session.begin():

    这里使用另一个异步上下文管理器db.async_session()来开启一个异步会话。在这个会话中,再次使用session.begin()开启了一个事务,用于执行查询和插入操作。

  4. 查询数据:

    sql = db.select(Student).filter(Student.classes == 305).order_by(Student.id) student = await session.execute(sql) print(student.first()) print(student.mappings().all())
    • 构建了一个查询语句,选择Student模型中classes等于305的记录,并按id排序。
    • 使用await session.execute(sql)异步执行查询。
    • student.first()获取查询结果中的第一条记录。
    • student.mappings().all()获取查询结果的所有记录,并以字典形式返回。
  5. 写入数据:

    student1 = Student(name="小明1号", classes="302", sex=True, age=18, description="滚出去..") # ...(其他学生对象的创建) session.add(student1) session.add_all([student1, student2, student3, student4]) await session.commit()
    • 创建了几个Student对象,代表要插入数据库的学生记录。
    • 使用session.add(student1)添加单个学生记录到会话中。
    • 使用session.add_all([...])添加多个学生记录到会话中。注意这里有一个错误:student1被重复添加了,这可能是一个编码时的疏忽。
    • 使用await session.commit()异步提交事务,将所有添加到会话中的记录保存到数据库中。

注意事项:

  • 代码中有一行被注释掉了:

    # await conn.run_sync(Model.metadata.drop_all)

    这行代码如果取消注释,将会删除当前程序中所有模型对应的数据表。这通常用于重置数据库到初始状态,但在生产环境中使用时需要谨慎。

  • 异步编程和SQLAlchemy的结合使得数据库操作更加高效,特别是在I/O密集型的应用中,因为它允许在等待数据库响应的同时执行其他任务。

### 使用异步关系(Relationships)在 SQLAlchemy 中 为了实现异步操作,在 SQLAlchemy 1.4 版本之后引入了 `asyncio` 支持,这使得可以利用 Python 的原生协程特性来执行数据库查询。当涉及到定义模型之间的关系时,核心概念保持不变;然而,加载这些关系的方式可以通过异步方式完成。 #### 定义带有异步支持的关系 首先,确保安装了最新版本的 SQLAlchemy 和相应的异步驱动程序,比如 psycopg2-binary 对于 PostgreSQL 数据库: ```bash pip install sqlalchemy[asyncpg] psycopg2-binary ``` 接着创建一个简单的例子展示如何设置两个表之间的一对多关系,并通过异步会话访问它们的数据: ```python from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import declarative_base, relationship, sessionmaker from sqlalchemy.future import select from sqlalchemy import Column, Integer, String, ForeignKey Base = declarative_base() class Parent(Base): __tablename__ = 'parent' id = Column(Integer, primary_key=True) name = Column(String(50)) children = relationship("Child", back_populates="parent") # 关系字段声明[^1] class Child(Base): __tablename__ = 'child' id = Column(Integer, primary_key=True) parent_id = Column(ForeignKey('parent.id')) name = Column(String(50)) parent = relationship("Parent", back_populates="children") # 创建引擎并配置会话工厂 engine = create_async_engine( "postgresql+asyncpg://user:password@localhost/dbname", echo=True, ) AsyncSessionFactory = sessionmaker(engine, class_=AsyncSession) async def get_children_of_parent(parent_name: str): """获取指定父对象下的所有子对象""" async with AsyncSessionFactory() as session: result = await session.execute(select(Parent).where(Parent.name == parent_name)) # 异步查询语句[^2] parent = result.scalars().first() if parent is None: raise ValueError(f"No such parent named {parent_name}") return parent.children # 访问关联属性以触发懒加载或立即加载取决于配置 if __name__ == "__main__": from asyncio import run try: children = run(get_children_of_parent("example")) print([c.name for c in children]) except Exception as e: print(e) ``` 在这个案例里,注意几点重要的地方: - 使用 `create_async_engine()` 来代替普通的同步版函数。 - 当调用 `session.execute()` 方法时采用的是异步形式。 - 可以像平常一样定义 ORM 映射类中的关系,但在实际应用中应当考虑使用合适的加载策略(如 lazy loading 或 eager loading),以便更好地控制何时以及怎样从数据库读取数据。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值