Source code for examples.asyncio.gather_orm_statements
"""Illustrates how to run many statements concurrently using ``asyncio.gather()``along many asyncio database connections, merging ORM results into a single``AsyncSession``.Note that this pattern loses all transactional safety and is also notnecessarily any more performant than using a single Session, as it addssignificant CPU-bound work both to maintain more database connectionsand sessions, as well as within the merging of results from external sessionsinto one.Python is a CPU-intensive language even in trivial cases, so it is stronglyrecommended that any workarounds for "speed" such as the one below arecarefully vetted to show that they do in fact improve performance vs atraditional approach."""importasyncioimportrandomfromsqlalchemyimportColumnfromsqlalchemyimportIntegerfromsqlalchemyimportStringfromsqlalchemy.ext.asyncioimportAsyncSessionfromsqlalchemy.ext.asyncioimportcreate_async_enginefromsqlalchemy.ext.declarativeimportdeclarative_basefromsqlalchemy.futureimportselectfromsqlalchemy.ormimportmerge_frozen_resultfromsqlalchemy.ormimportsessionmakerBase=declarative_base()classA(Base):__tablename__="a"id=Column(Integer,primary_key=True)data=Column(String)asyncdefrun_out_of_band(sessionmaker,session,statement,merge_results=True):"""run an ORM statement in a distinct session, merging the result back into the given session. """asyncwithsessionmaker()asoob_session:# use AUTOCOMMIT for each connection to reduce transaction# overhead / contentionawaitoob_session.connection(execution_options={"isolation_level":"AUTOCOMMIT"})# pre 1.4.24# await oob_session.run_sync(# lambda sync_session: sync_session.connection(# execution_options={"isolation_level": "AUTOCOMMIT"}# )# )result=awaitoob_session.execute(statement)ifmerge_results:# merge_results means the ORM objects from the result# will be merged back into the original session.# load=False means we can use the objects directly without# re-selecting them. however this merge operation is still# more expensive CPU-wise than a regular ORM load because the# objects are copied into new instancesreturn(awaitsession.run_sync(merge_frozen_result,statement,result.freeze(),load=False,))()else:awaitresult.close()asyncdefasync_main():engine=create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test",echo=True,)asyncwithengine.begin()asconn:awaitconn.run_sync(Base.metadata.drop_all)awaitconn.run_sync(Base.metadata.create_all)async_session=sessionmaker(engine,expire_on_commit=False,class_=AsyncSession)asyncwithasync_session()assession,session.begin():session.add_all([A(data="a_%d"%i)foriinrange(100)])statements=[select(A).where(A.data=="a_%d"%random.choice(range(100)))foriinrange(30)]results=awaitasyncio.gather(*(run_out_of_band(async_session,session,statement)forstatementinstatements))print(f"results: {[r.all()forrinresults]}")asyncio.run(async_main())