Using Celery ver.3.1.23, I am trying to dynamically add a scheduled task to celery beat. I have one celery worker and one celery beat instance running.
Triggering a standard celery task y running task.delay() works ok. When I define a scheduled periodic task as a setting in configuration, celery beat runs it.
However what I need is to be able to add a task that runs at specified crontab at runtime. After adding a task to persistent scheduler, celery beat doesn't seem to detect the newly added new task. I can see that the celery-schedule file does have an entry with new task.
Code:
scheduler = PersistentScheduler(app=current_app, schedule_filename='celerybeat-schedule')
scheduler.add(name="adder",
task="app.tasks.add",
schedule=crontab(minute='*/1'),
args=(1,2))
scheduler.close()
When I run:
print(scheduler.schedule)
I get:
{'celery.backend_cleanup': <Entry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>,
'adder': <Entry: adder app.tasks.add(1, 2) <crontab: */1 * * * * (m/h/d/dM/MY)>}
Note that app.tasks.add has the @celery.task decorator.
Celery beat stores all the periodically scheduled tasks in the PeriodicTask table. A task can be scheduled in several ways, including crontab, interval, etc.
In order to dynamically add a scheduled task, create the desired type of schedule object (CrontabSchedule in the example below), and pass it into a new PeriodicTask object.
from django_celery_beat.models import PeriodicTask, CrontabSchedule
# -- Inside the function you want to add task dynamically
schedule = CrontabSchedule.objects.create(minute='*/1')
PeriodicTask.objects.create(name='adder',
task='apps.task.add', crontab=schedule)
Instead of trying to find a good workaround, I suggest you switch to the Celery Redbeat.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With