Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery Production Graceful Restart

Tags:

python

celery

I need to restart the celery daemon but I need it to tell the current workers to shutdown as their tasks complete and then spin up a new set of workers while the old ones are still shutting down.

The current graceful option on the daemon waits for all tasks to complete before restarting which is not useful when you have long running jobs.

Please do not suggest autoreload as it is currently undocumented in 4.0.2.

like image 541
lpiner Avatar asked Nov 02 '25 10:11

lpiner


2 Answers

Alright well what I ended up doing was using supervisord and ansible to manage this.

[program:celery_worker]
# Max concurrent task you wish to run.
numprocs=5
process_name=%(program_name)s-%(process_num)s
directory=/opt/worker/main
# Ignore this unless you want to use virtualenvs.
environment=PATH="/opt/worker/main/bin:%(ENV_PATH)s"
command=/opt/worker/bin/celery worker -n worker%(process_num)s.%%h --app=python --time-limit=3600 -c 5 -Ofair -l debug --config=celery_config -E
stdout_logfile=/var/log/celery/%(program_name)s-%(process_num)s.log
user=worker_user
autostart=true
autorestart=true
startretries=99999
startsecs=10
stopsignal=TERM
stopwaitsecs=7200
killasgroup=false

You can use supervisor to stop/start the workers to load new code but it will wait for all of them to stop before starting them again, this does not work well for long running jobs. Better just to TERM the MainProcesses which will tell the workers to stop accepting jobs and shutdown as they finish.

ps aux | grep *celery.*MainProcess | awk '{print $2}' | xargs kill -TERM

Supervisor will restart them as they die.

Of course updating dependencies without totally stopping all the workers is pretty impossible which makes a really good case for using something like docker. ;)

like image 99
lpiner Avatar answered Nov 04 '25 02:11

lpiner


On Celery4, I had to patch the base Task class to make it work. Source

import signal
from celery import Celery, Task
from celery.utils.log import get_task_logger
logger = get_task_logger('my_logger')

class MyBaseTask(Task):
    def __call__(self, *args, **kwargs):
        signal.signal(signal.SIGTERM,
                      lambda signum, frame: logger.info('SIGTERM received, 
                                            wait till the task finished'))
        return super().__call__(*args, **kwargs)

app = Celery('my_app')
app.Task = MyBaseTask

Also there is a patch that prevents rescheduling upon warn shutdown

like image 27
Forethinker Avatar answered Nov 04 '25 00:11

Forethinker



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!