Source code for examples.asyncio.gather_orm_statements

"""
Illustrates how to run many statements concurrently using ``asyncio.gather()``
along many asyncio database connections, merging ORM results into a single
``AsyncSession``.

Note that this pattern loses all transactional safety and is also not
necessarily any more performant than using a single Session, as it adds
significant CPU-bound work both to maintain more database connections
and sessions, as well as within the merging of results from external sessions
into one.

Python is a CPU-intensive language even in trivial cases, so it is strongly
recommended that any workarounds for "speed" such as the one below are
carefully vetted to show that they do in fact improve performance vs a
traditional approach.

"""

import asyncio
import random

from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.future import select
from sqlalchemy.orm import merge_frozen_result
from sqlalchemy.orm import sessionmaker

Base = declarative_base()


class A(Base):
    __tablename__ = "a"

    id = Column(Integer, primary_key=True)
    data = Column(String)


async def run_out_of_band(
    sessionmaker, session, statement, merge_results=True
):
    """run an ORM statement in a distinct session, merging the result
    back into the given session.

    """

    async with sessionmaker() as oob_session:

        # use AUTOCOMMIT for each connection to reduce transaction
        # overhead / contention
        await oob_session.connection(
            execution_options={"isolation_level": "AUTOCOMMIT"}
        )

        # pre 1.4.24
        # await oob_session.run_sync(
        #     lambda sync_session: sync_session.connection(
        #         execution_options={"isolation_level": "AUTOCOMMIT"}
        #     )
        # )

        result = await oob_session.execute(statement)

        if merge_results:
            # merge_results means the ORM objects from the result
            # will be merged back into the original session.
            # load=False means we can use the objects directly without
            # re-selecting them.  however this merge operation is still
            # more expensive CPU-wise than a regular ORM load because the
            # objects are copied into new instances
            return (
                await session.run_sync(
                    merge_frozen_result,
                    statement,
                    result.freeze(),
                    load=False,
                )
            )()
        else:
            await result.close()


async def async_main():

    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

    async_session = sessionmaker(
        engine, expire_on_commit=False, class_=AsyncSession
    )

    async with async_session() as session, session.begin():
        session.add_all([A(data="a_%d" % i) for i in range(100)])

    statements = [
        select(A).where(A.data == "a_%d" % random.choice(range(100)))
        for i in range(30)
    ]

    results = await asyncio.gather(
        *(
            run_out_of_band(async_session, session, statement)
            for statement in statements
        )
    )
    print(f"results: {[r.all() for r in results]}")


asyncio.run(async_main())