"""Illustrates use of the sqlalchemy.ext.asyncio.AsyncSession objectfor asynchronous ORM use."""importasynciofromsqlalchemyimportColumnfromsqlalchemyimportDateTimefromsqlalchemyimportForeignKeyfromsqlalchemyimportfuncfromsqlalchemyimportIntegerfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportAsyncSessionfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.futureimportselectfromsqlalchemy.ormimportdeclarative_basefromsqlalchemy.ormimportrelationshipfromsqlalchemy.ormimportselectinloadfromsqlalchemy.ormimportsessionmakerBase=declarative_base()classA(Base):__tablename__="a"id=Column(Integer,primary_key=True)data=Column(String)create_date=Column(DateTime,server_default=func.now())bs=relationship("B")# required in order to access columns with server defaults# or SQL expression defaults, subsequent to a flush, without# triggering an expired load__mapper_args__={"eager_defaults":True}classB(Base):__tablename__="b"id=Column(Integer,primary_key=True)a_id=Column(ForeignKey("a.id"))data=Column(String)asyncdefasync_main():"""Main program function."""engine=create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test",echo=True,)asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.drop_all)asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.create_all)# expire_on_commit=False will prevent attributes from being expired# after commit.async_session=sessionmaker(engine,expire_on_commit=False,class_=AsyncSession)asyncwithasync_session()assession:asyncwithsession.begin():session.add_all([A(bs=[B(),B()],data="a1"),A(bs=[B()],data="a2"),A(bs=[B(),B()],data="a3"),])# for relationship loading, eager loading should be applied.stmt=select(A).options(selectinload(A.bs))# AsyncSession.execute() is used for 2.0 style ORM execution# (same as the synchronous API).result=awaitsession.execute(stmt)# result is a buffered Result object.fora1inresult.scalars():print(a1)print(f"created at: {a1.create_date}")forb1ina1.bs:print(b1)# for streaming ORM results, AsyncSession.stream() may be used.result=awaitsession.stream(stmt)# result is a streaming AsyncResult object.asyncfora1inresult.scalars():print(a1)forb1ina1.bs:print(b1)result=awaitsession.execute(select(A).order_by(A.id))a1=result.scalars().first()a1.data="new data"awaitsession.commit()asyncio.run(async_main())