Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python: Execute long-running task in a background process forking off FastAPI app [duplicate]

TL;DR

In my gunicorn/uvicorn-run FastAPI app, I need to execute some long-running tasks in a completely non-blocking way, so the main asyncio event loop is not affected. The only approach I can think of is spinning off separate processes to fire the tasks and then somehow collect the results and signal the main loop. So basically, my workflow should look something like:

1. Fire task in separate process (do ffmpeg video encoding and save files/data).
2. Forget about the running process and do other stuff in a normal way.
3. Get "I'm done" signal from the process(es) and check for errors.
4. Handle results.

The long-running stuff

My long-running task calls ffmpeg video encoder on some files, actually using asyncio.subprocess to fork an external ffmpeg process. Then it does some file operations on the resulting files, and stores some data in the app's database. The code looks as follows (simplified version):

import ffmpeg  # ffmpeg-python (https://kkroening.github.io/ffmpeg-python/)
import asyncio
from pydantic import BaseModel

class ProcessResultModel(BaseModel):
    returncode: int = None
    stdout: str = ''
    stderr: str = ''

    class Config:
        arbitrary_types_allowed = True

@ffmpeg.nodes.output_operator()
async def run_async_async(stream_spec, cmd='ffmpeg', pipe_stdin=False, pipe_stdout=False,
                          pipe_stderr=False, quiet=False, overwrite_output=False,
                          run: bool = True) -> Union[asyncio.subprocess.Process, ProcessResultModel]:
    # compile ffmpeg args
    args = ffmpeg._run.compile(stream_spec, cmd, overwrite_output=overwrite_output)
    # pipe streams as required
    stdin_stream = asyncio.subprocess.PIPE if pipe_stdin else None
    stdout_stream = asyncio.subprocess.PIPE if pipe_stdout or quiet else None
    stderr_stream = asyncio.subprocess.PIPE if pipe_stderr or quiet else None
    # create subprocess (ffmpeg)
    process = await asyncio.create_subprocess_exec(*args, stdin=stdin_stream,
                                                    stdout=stdout_stream, stderr=stderr_stream)
    # if not told to run, simply return the process object
    if not run: return process
    # run process and collect results
    stdout, stderr = await process.communicate()
    # return results in a nice object
    return ProcessResultModel(returncode=process.returncode,
                              stdout=stdout.decode('utf-8') if stdout else '',
                              stderr=stderr.decode('utf-8') if stderr else '')

The problem

If I simply call this in my FastAPI CRUD function as is:

async def fire_task(stream):
    res = await stream.run_async_async(run=True)

it will call process.communicate() and effectually block my main event loop until the entire task is done. If I call it with run=False, it will just return the initialized process which I will need to start somewhere myself.

Is there a way to fire-and-forget the process without blocking the event loop and then at some point, get the process to signalize that it's done and collect the results - in a safe and robust manner?

like image 294
s0mbre Avatar asked Nov 16 '25 07:11

s0mbre


1 Answers

So after perusing the replies to similar questions mentioned here (thanks @Chris), I finally managed to put together a working solution. See below.

[1] FastAPI App lifespan

Instantiate global singleton thread pool in the app's lifespan:

from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager

POOL_MAX_THREADS = 20

@asynccontextmanager
async def lifespan(app: FastAPI):
    # create thread pool for long-runners
    pool = ThreadPoolExecutor(max_workers=POOL_MAX_THREADS)
    
    # do other initialization / regular tasks
    await on_startup_single(app)
    await regular_tasks_5min()

    # yield globals (will be accessible in requests)
    yield {'pool': pool}

    # do shutdown activities
    await on_shutdown_single(app)
    pool.shutdown()

[2] FastAPI Endpoint(s)

Pass global pool to underlying CRUD functions from the endpoints via Request:

from fastapi import APIRouter, Request, Body
import crud  # app's CRUD functions

router = APIRouter()

@router.post('/do-stuff')
async def do_stuff(request: Request, data: dict = Body(...)):
    # pass global thread pool in request state
    await crud.do_stuff(data, request.state.pool)

[3] CRUD function

Prepare params and fire long-running task in the thread pool:

from concurrent.futures import Executor, ThreadPoolExecutor
import asyncio

POOL_MAX_THREADS = 20

# the long-running worker we need to run in a non-blocking fashion
async def very_long_task(data):
    await one()
    await two()
    return await three()

# the task-spawning function
async def do_stuff(data: dict, pool: Executor = None)
    # we may also choose to execute in a default pool
    if pool is None: pool = ThreadPoolExecutor(max_workers=POOL_MAX_THREADS)
    # get main event loop
    loop = asyncio.get_running_loop()
    # fire task and return immediately, leaving the task to run in thread pool
    loop.run_in_executor(pool, lambda: asyncio.run(very_long_task(data)))

That's it basically!

PS.

I couldn't manage to get the code working with the other suggested solutions, including ProcessPoolExecutor instead of ThreadPoolExecutor, or using asyncio.Queue to take tasks off the queue as they appear.

like image 100
s0mbre Avatar answered Nov 17 '25 20:11

s0mbre