Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use AsyncSession from sqlalchemy in celery tasks?

Use AsyncSession in celery tasks

I use fastapi and sqlalchemy, I must create celery task, that will go to the database and check does any objects of my Event (table) has end_time < datetime.now()

There is my code:

@asynccontextmanager
async def scoped_session():
    scoped_factory = async_scoped_session(
        async_session,
        scopefunc=asyncio.current_task()
    )
    try:
        async with scoped_factory() as s:
            yield s
    finally:
        await scoped_factory().remove()


async def logic():
    async with scoped_session() as session:
        stmt = select(event.models.Event).where(
            event.models.Event.end_time <= datetime.now()
        )
        results = await session.execute(stmt)
        for res in results.fetchall():
            print(res.is_event_done)


@celery.task(name='is_event_done', bind=True, ignore_result=True)
def is_event_done(self):
    asyncio.run(logic())

here is my async_session

engine = create_async_engine(settings.db_url, echo=True)

async_session = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False
)

so I got \'_asyncio.Task\' object is not callable

like image 783
jabajke Avatar asked Oct 15 '25 02:10

jabajke


1 Answers

I just do like this

async def update_event() -> None:
    async with async_session() as session:
        stmt = update(event.models.Event).where(
            event.models.Event.end_time <= datetime.now(),
            event.models.Event.is_active is True
        ).values(is_done=True)
        await session.execute(stmt)


@celery.task(name='is_event_done', bind=True, ignore_result=True)
def is_event_done(self) -> None:
    loop.run_until_complete(update_event())

and now its work fine, I hope that its a good solution If there is anything wrong let me know, thanks!

like image 162
jabajke Avatar answered Oct 17 '25 15:10

jabajke



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!