I have the following project tree (for testing purposes) and I am trying to understand how Celery is loading tasks.
app
├── __init__.py
├── app.py
├── celery.py
└── my_tasks
├── __init__.py
└── tasks.py
celery.py
contains the following code for creating a Celery instance:
from celery import Celery
app = Celery("app", backend="rpc://", broker="redis://localhost:6379/0")
app.autodiscover_tasks()
tasks.py
creates a task:
from app.celery import app
from time import sleep
@app.task
def run_tests_for_hash():
# task code here
and app.py
contains a FastAPI application with one endpoint to create a task
from fastapi import FastAPI
from app.call_flow_tasks.tasks import run_tests_for_hash
app = FastAPI(title="Testing Studio runners")
@app.post("/create_task")
def create_task():
results = run_tests_for_hash.delay()
return {"task_id": results.id, "status": results.status}
but trying to run Celery worker from command line I get an error:
$celery -A app worker
Traceback (most recent call last):
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/bin/celery", line 8, in <module>
sys.exit(main())
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/__main__.py", line 16, in main
_main()
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 322, in main
cmd.execute_from_commandline(argv)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 495, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/base.py", line 305, in execute_from_commandline
return self.handle_argv(self.prog_name, argv[1:])
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 487, in handle_argv
return self.execute(command, argv)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/celery.py", line 419, in execute
).run_from_argv(self.prog_name, argv[1:], command=argv[0])
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/worker.py", line 223, in run_from_argv
return self(*args, **options)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/base.py", line 253, in __call__
ret = self.run(*args, **kwargs)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/bin/worker.py", line 258, in run
**kwargs)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/worker/worker.py", line 96, in __init__
self.app.loader.init_worker()
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/loaders/base.py", line 114, in init_worker
self.import_default_modules()
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/loaders/base.py", line 108, in import_default_modules
raise response
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/utils/dispatch/signal.py", line 288, in send
response = receiver(signal=self, sender=sender, **named)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/vine/promises.py", line 170, in __call__
return self.throw()
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/vine/promises.py", line 167, in __call__
retval = fun(*final_args, **final_kwargs)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 695, in _autodiscover_tasks
return self._autodiscover_tasks_from_fixups(related_name)
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 705, in _autodiscover_tasks_from_fixups
pkg for fixup in self._fixups
File "/Users/avlachopoulos/.venvs/test-runner-JK5CNUCV/lib/python3.7/site-packages/celery/app/base.py", line 706, in <listcomp>
for pkg in fixup.autodiscover_tasks()
AttributeError: 'NoneType' object has no attribute 'autodiscover_tasks'
Command is executed on the same level as the app
python package. If I remove autodiscover
it works as it should and workers are loaded normally. Any help on how celery autodiscovers tasks and how to load task from different modules?
You can combine two types of autodiscovers:
app.autodiscover_tasks() # Find tasks using celery.fixups (i.e. Django apps via INSTALLED_APPS).
app.autodiscover_tasks( # Add other tasks not included in the apps.
[
'project.other.tasks',
'another_project.another.app',
]
)
Celery has only one BUILTIN_FIXUPS = {'celery.fixups.django:fixup'}
, see it's code to understand.
def autodiscover_tasks(self):
from django.apps import apps
return [config.name for config in apps.get_app_configs()]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With