I need to restart the celery daemon but I need it to tell the current workers to shutdown as their tasks complete and then spin up a new set of workers while the old ones are still shutting down.
The current graceful option on the daemon waits for all tasks to complete before restarting which is not useful when you have long running jobs.
Please do not suggest autoreload as it is currently undocumented in 4.0.2.
Alright well what I ended up doing was using supervisord and ansible to manage this.
[program:celery_worker]
# Max concurrent task you wish to run.
numprocs=5
process_name=%(program_name)s-%(process_num)s
directory=/opt/worker/main
# Ignore this unless you want to use virtualenvs.
environment=PATH="/opt/worker/main/bin:%(ENV_PATH)s"
command=/opt/worker/bin/celery worker -n worker%(process_num)s.%%h --app=python --time-limit=3600 -c 5 -Ofair -l debug --config=celery_config -E
stdout_logfile=/var/log/celery/%(program_name)s-%(process_num)s.log
user=worker_user
autostart=true
autorestart=true
startretries=99999
startsecs=10
stopsignal=TERM
stopwaitsecs=7200
killasgroup=false
You can use supervisor to stop/start the workers to load new code but it will wait for all of them to stop before starting them again, this does not work well for long running jobs. Better just to TERM the MainProcesses which will tell the workers to stop accepting jobs and shutdown as they finish.
ps aux | grep *celery.*MainProcess | awk '{print $2}' | xargs kill -TERM
Supervisor will restart them as they die.
Of course updating dependencies without totally stopping all the workers is pretty impossible which makes a really good case for using something like docker. ;)
On Celery4, I had to patch the base Task class to make it work. Source
import signal
from celery import Celery, Task
from celery.utils.log import get_task_logger
logger = get_task_logger('my_logger')
class MyBaseTask(Task):
def __call__(self, *args, **kwargs):
signal.signal(signal.SIGTERM,
lambda signum, frame: logger.info('SIGTERM received,
wait till the task finished'))
return super().__call__(*args, **kwargs)
app = Celery('my_app')
app.Task = MyBaseTask
Also there is a patch that prevents rescheduling upon warn shutdown
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With