Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to test a custom scheduler with celerybeat?

I'm writing in python a custom scheduler class for celerybeat based on the celerybeat-mongo project which works with mongodb.

Actually I'm trying to make this work with couchbase instead of mongodb. I wrote a ScheduleEntry class and a Scheduler class as well, I get the schedulers list from a couchbase document and parse it into ScheduleEntry objects, etc...

But when I run it as told in this link, nothing seems to happen

celery -A <my.task.file> beat -S <my.scheduler.CouchBaseScheduler>

I'm pretty new to celery, I already ran some workers with tasks but I don't know quite exactly how the scheduler works. Celerybeat is starting well, I only know that it reads my schedulers correctly from the database but then no task seems to be called despite the fact I indicated the tasks in my tasks.py file.

Am I in the right direction? Is the command line above ok? How could I debug it since my only way to run it is from the command line (using subprocess to start it from a script and debug it would be dirty).

Edit: I add some details about this:

First of all I wrote a basic task in a tasks.py file:

import celery
import os
from datetime import datetime
from celery.utils.log import get_logger

def log_task_info(task_name, process_index, init_date):
    # logger.warn(task_name + ': ' + str(process_index) + ':' + str(init_date) + ' : ' + str(os.getpid()) + ':' +
    #             str(datetime.now()))
    get_logger(__name__).warning(task_name + ': ' + str(process_index) +
                                 ':' + str(init_date) + ' : ' + str(os.getpid()) + ':' + str(datetime.now()))

@celery.task(name='tasks.heartbeat')
def heartbeat():
    log_task_info('heartbeat', os.getpid(), datetime.now())
    return "Hello!"

Then I subclassed the Scheduler and SchedulerEntry classes.

class CouchBaseScheduler(Scheduler):

    UPDATE_INTERVAL = datetime.timedelta(seconds=5)

    Entry = CouchBaseScheduleEntry

    host = "192.168.59.103"
    port = "8091"
    bucket = "celery"
    doc_string = "scheduler_list"
    password = "1234"
    scheduleCount = 0

    def __init__(self, *args, **kwargs):
        if hasattr(current_app.conf, "CELERY_COUCHBASE_SCHEDULER_BUCKET"):
            bucket_str = current_app.conf.CELERY_COUCHBASE_SCHEDULER_BUCKET
        else:
            bucket_str = "celery"
        if hasattr(current_app.conf, "CELERY_COUCHBASE_SCHEDULER_URL"):
            cnx_string = "{}/{}".format(current_app.conf.CELERY_COUCHBASE_SCHEDULER_URL, bucket_str)
        else:
            cnx_string = "couchbase://{}:{}/{}".format(self.host, self.port, self.bucket)

        try:
            self.bucket = Bucket(cnx_string, password=self.password, quiet=True)
            self.couchcel = CouchBaseCelery(self.bucket, self.doc_string)
            get_logger(__name__).info("backend scheduler using %s", cnx_string)
            self._schedule = {}
            self._last_updated = None
            Scheduler.__init__(self, *args, **kwargs)
            self.max_interval = (kwargs.get('max_interval')
                             or self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or 5)
        except AuthError:
            get_logger(__name__).error("Couchbase connection %s failed : Auth failed!", cnx_string)
        except CouchbaseError as cbe:
            get_logger(__name__).debug("Couchbase connection %s failed : %s", cnx_string, type(cbe))


    def setup_schedule(self):
        pass

    def requires_update(self):
        if not self._last_updated:
            return True
        return self._last_updated + self.UPDATE_INTERVAL < datetime.datetime.now()

    def get_from_database(self):
        self.sync()
        try:
            get_logger(__name__).info("Getting scheduler list from couchbase.")
            couch_scheduler_list = self.couchcel.get_scheduler_list()
            return couch_scheduler_list
        except Exception as e:
            get_logger(__name__).error("Could not get scheduler list from couchbase: {}".format(e))

    @property
    def schedule(self):
        # self.scheduleCount += 1
        # get_logger(__name__).info("Scheduling {}".format(self.scheduleCount))
        if self.requires_update():
            get_logger(__name__).info("Schedule {} requires update".format(self.scheduleCount))
            self._schedule = self.get_from_database()
            self._last_updated = datetime.datetime.now()
        return self.schedule

    def sync(self):
        for entry in self._schedule.values():
            entry.save(self.couchcel)

and

class CouchBaseScheduleEntry(ScheduleEntry):

    def __init__(self, taskid, task):
        self._task = task

        self.app = current_app._get_current_object()
        self._id = taskid
        get_logger(__name__).info("Task id: {} processing".format(self._id))
        try:
            if all(k in self._task for k in ('name', 'task', 'enabled')):
                self.name = self._task['name']
                self.task = self._task['task']
            else:
                raise Exception("Field name, task or enabled are mandatory!")

            self.args = self._task['args']
            self.kwargs = self._task['kwargs']
            self.options = self._task['options']

            if 'interval' in self._task and 'crontab' in self._task:
                raise Exception("Cannot define both interval and crontab schedule")
            if 'interval' in self._task:
                interval = self._task['interval']
                if interval['period'] in PERIODS:
                    self.schedule = self._interval_schedule(interval['period'], interval['every'])
                    get_logger(__name__).info("Task contains interval")
                else:
                    raise Exception("The value of an interval must be {}".format(PERIODS))
            elif 'crontab' in self._task:
                crontab = self._task['crontab']
                self.schedule = self._crontab_schedule(crontab)
                get_logger(__name__).info("Task contains crontab")
            else:
                raise Exception("You must define interval or crontab schedule")

            if self._task['total_run_count'] is None:
                self._task['total_run_count'] = 0
            self.total_run_count = self._task['total_run_count']
            get_logger(__name__).info("Task total run count: {}".format(self.total_run_count))

            if not self._task['last_run_at']:
                self._task['last_run_at'] = self._default_now()
            else:
                self._task['last_run_at'] = datetime.datetime.strptime(self._task['last_run_at'], DATEFORMAT)
            self.last_run_at = self._task['last_run_at']
            get_logger(__name__).info("Task last run at: {}".format(self.last_run_at))
        except KeyError as ke:
            print('Key not valid: {}'.format(ke))

    def _default_now(self):
        return self.app.now()

    def next(self):
        self._task['last_run_at'] = self.app.now()
        self._task['total_run_count'] += 1
        self._task['run_immediately'] = False
        get_logger(__name__).info("NEXT!")
        return self.__class__(self._task)

    __next__ = next

    def is_due(self):
        if not self._task['enabled']:
            return False, 5.0 # 5 secs delay for reenable
        if self._task['run_immediately']:
            # figure out when the schedule would run next anyway
            _, n = self.schedule.is_due(self.last_run_at)
            return True, n
        return self.schedule.is_due(self.last_run_at)

    def _crontab_schedule(self, crontab):
        return celery.schedules.schedule(minute=crontab['minute'],
                                         hour=crontab['hour'],
                                         day_of_week=crontab['day_of_week'],
                                         day_of_month=crontab['day_of_month'],
                                         month_of_year=crontab['month_of_year'])

    def _interval_schedule(self, period, every):
        return celery.schedules.schedule(datetime.timedelta(**{period: every}))


    def __repr__(self):
        return '<CouchBaseScheduleEntry ({0} {1}(*{2}, **{3}) {{4}})>'.format(
            self.name, self.task, self.args,
            self.kwargs, self.schedule
        )

    def reserve(self, entry):
        new_entry = Scheduler.reserve(self, entry)
        return new_entry

    @property
    def getid(self):
        return self._id

    @property
    def gettaskdict(self):
        return self._task

    def tojson(self):
        return json.dumps(self.tocouchdict())

    def save(self, couchcel):
        get_logger(__name__).info("Saving task {} in couchbase".format(self._id))
        if self.total_run_count > self._task['total_run_count']:
            self._task['total_run_count'] = self.total_run_count
        get_logger(__name__).error("{}, {}".format(self.last_run_at, self._task['last_run_at']))
        try:
            if self.last_run_at and self._task['last_run_at'] \
                    and self.last_run_at > self._task['last_run_at']:
                self._task['last_run_at'] = self.last_run_at

        except TypeError:
            if self.last_run_at and self._task['last_run_at'] \
                    and self.last_run_at > datetime.datetime.strptime(self._task['last_run_at'], DATEFORMAT):
                self._task['last_run_at'] = self.last_run_at
        self._task['run_immediately']= False
        couchcel.save_scheduler(self)

The couchcel object is used for database access, the ScheduleEntry object parses the datas coming from the couchbase document.

Best regards

like image 654
onizukaek Avatar asked Dec 31 '25 19:12

onizukaek


1 Answers

The documentation on the Celery website seems to be a bit misleading. If you look here, you can see that the command line option -S sets the state database, not the scheduler for the worker.

Try running this with the --scheduler option instead:

celery -A <my.task.file> beat --scheduler <my.scheduler.CouchBaseScheduler>
like image 117
kellanburket Avatar answered Jan 02 '26 08:01

kellanburket



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!