Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pytest fixture to migrate db using sqlalchemy with async driver

Problem

I'm trying to write a fixture which will migrate sqlite db to state described by ORM models based on declarative_base.
Following the guide from sqlalchemy docs, I've reached somewhat working solution, although I've got no idea how to remove await migrate_db.__anext__() or run migrate_db fixture automatically before tests in the module.

from asyncio import current_task

import pytest
from sqlalchemy import Column, String, Integer, SmallInteger, select
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine, async_scoped_session, \
    AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker

Base = declarative_base()


class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True)
    name = Column(String)
    quantity = Column(SmallInteger)


@pytest.fixture(scope="module", autouse=True)
def engine() -> AsyncEngine:
    yield create_async_engine(
        'sqlite+aiosqlite://', future=True, echo=True, connect_args={"check_same_thread": False}
    )


@pytest.fixture(scope="module", autouse=True)
async def migrate_db(engine):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)
    yield
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)


@pytest.fixture(scope="module")
def async_session_maker(engine):
    yield async_scoped_session(
        sessionmaker(engine, expire_on_commit=False, class_=AsyncSession), scopefunc=current_task
    )


@pytest.mark.asyncio
async def test_get(async_session_maker, migrate_db):

    await migrate_db.__anext__()  # Removing this line causes 
    # sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: items not exits.

    async with async_session_maker() as session:
        await session.execute(
            "INSERT INTO items (id, name, quantity) VALUES "
            '(0, "crowbar", 13),'
            '(1, "lamp", 94)'
        )
        stmt = select(Item).where(Item.id == 1)
        rows = await session.execute(stmt)
        result = rows.scalar_one()

        assert result.id == 1
        assert result.name == "lamp"
        assert result.quantity == 94

I've tried:

  • using asyncio.wait([asyncio.create_task(migate_db_)]) as a synchronous function but it did not work.
  • Converting async_session_maker to async fixture which awaits migrate_db, but that converted async_session_maker to async generator which has to be awaited in similar way migrate_db was awaited.
  • Considered putting migration into pytest_sessionstart although it does not seem correct, beacause this db setup should be only for specified module not entire test suite.

What I'm expecting

To have a fixture which after invoking will migrate db to state described by declarative base, without being explicitly awaited in test.

like image 840
ToJestKrzysio Avatar asked Jan 26 '26 08:01

ToJestKrzysio


1 Answers

Managed to solve the problem by putting setup and tear-down logic into separate async functions and replacing async fixture with a normal one which runs tasks in event loop.

async def create_all(engine):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)


async def drop_all(engine):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)


@pytest.fixture(scope="module", autouse=True)
def migrate_db(engine):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(create_all(engine))
    del loop

    yield

    loop = asyncio.get_event_loop()
    loop.run_until_complete(drop_all(engine))
    del loop
like image 52
ToJestKrzysio Avatar answered Jan 28 '26 23:01

ToJestKrzysio



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!