Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Autodiscovering celery tasks

Tags:

python

celery

I have the following project tree (for testing purposes) and I am trying to understand how Celery is loading tasks.

app
├── __init__.py
├── app.py
├── celery.py
└── my_tasks
    ├── __init__.py
    └── tasks.py

celery.py contains the following code for creating a Celery instance:

from celery import Celery

app = Celery("app", backend="rpc://", broker="redis://localhost:6379/0")
app.autodiscover_tasks()

tasks.py creates a task:

from app.celery import app
from time import sleep


@app.task
def run_tests_for_hash():
   # task code here

and app.py contains a FastAPI application with one endpoint to create a task

from fastapi import FastAPI
from app.call_flow_tasks.tasks import run_tests_for_hash

app = FastAPI(title="Testing Studio runners")

@app.post("/create_task")
def create_task():

    results = run_tests_for_hash.delay()

    return {"task_id": results.id, "status": results.status}

but trying to run Celery worker from command line I get an error:

$celery -A app worker
Traceback (most recent call last):
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/bin/celery", line 8, in <module>
    sys.exit(main())
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/__main__.py", line 16, in main
    _main()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 322, in main
    cmd.execute_from_commandline(argv)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 495, in execute_from_commandline
    super(CeleryCommand, self).execute_from_commandline(argv)))
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/base.py", line 305, in execute_from_commandline
    return self.handle_argv(self.prog_name, argv[1:])
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 487, in handle_argv
    return self.execute(command, argv)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 419, in execute
    ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/worker.py", line 223, in run_from_argv
    return self(*args, **options)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/base.py", line 253, in __call__
    ret = self.run(*args, **kwargs)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/worker.py", line 258, in run
    **kwargs)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/worker/worker.py", line 96, in __init__
    self.app.loader.init_worker()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/loaders/base.py", line 114, in init_worker
    self.import_default_modules()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/loaders/base.py", line 108, in import_default_modules
    raise response
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/utils/dispatch/signal.py", line 288, in send
    response = receiver(signal=self, sender=sender, **named)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
    return self.throw()
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
    retval = fun(*final_args, **final_kwargs)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 695, in _autodiscover_tasks
    return self._autodiscover_tasks_from_fixups(related_name)
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 705, in _autodiscover_tasks_from_fixups
    pkg for fixup in self._fixups
  File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 706, in <listcomp>
    for pkg in fixup.autodiscover_tasks()
AttributeError: 'NoneType' object has no attribute 'autodiscover_tasks'

Command is executed on the same level as the app python package. If I remove autodiscover it works as it should and workers are loaded normally. Any help on how celery autodiscovers tasks and how to load task from different modules?

like image 619
Apostolos Avatar asked Oct 17 '25 13:10

Apostolos


1 Answers

You can combine two types of autodiscovers:

app.autodiscover_tasks()  # Find tasks using celery.fixups (i.e. Django apps via INSTALLED_APPS).
app.autodiscover_tasks(  # Add other tasks not included in the apps.
    [
        'project.other.tasks',
        'another_project.another.app',
    ]
)

Celery has only one BUILTIN_FIXUPS = {'celery.fixups.django:fixup'}, see it's code to understand.

    def autodiscover_tasks(self):
        from django.apps import apps
        return [config.name for config in apps.get_app_configs()]
like image 76
frost-nzcr4 Avatar answered Oct 19 '25 03:10

frost-nzcr4



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!