I'm writing in python a custom scheduler class for celerybeat based on the celerybeat-mongo project which works with mongodb.
Actually I'm trying to make this work with couchbase instead of mongodb. I wrote a ScheduleEntry class and a Scheduler class as well, I get the schedulers list from a couchbase document and parse it into ScheduleEntry objects, etc...
But when I run it as told in this link, nothing seems to happen
celery -A <my.task.file> beat -S <my.scheduler.CouchBaseScheduler>
I'm pretty new to celery, I already ran some workers with tasks but I don't know quite exactly how the scheduler works. Celerybeat is starting well, I only know that it reads my schedulers correctly from the database but then no task seems to be called despite the fact I indicated the tasks in my tasks.py file.
Am I in the right direction? Is the command line above ok? How could I debug it since my only way to run it is from the command line (using subprocess to start it from a script and debug it would be dirty).
Edit: I add some details about this:
First of all I wrote a basic task in a tasks.py file:
import celery
import os
from datetime import datetime
from celery.utils.log import get_logger
def log_task_info(task_name, process_index, init_date):
# logger.warn(task_name + ': ' + str(process_index) + ':' + str(init_date) + ' : ' + str(os.getpid()) + ':' +
# str(datetime.now()))
get_logger(__name__).warning(task_name + ': ' + str(process_index) +
':' + str(init_date) + ' : ' + str(os.getpid()) + ':' + str(datetime.now()))
@celery.task(name='tasks.heartbeat')
def heartbeat():
log_task_info('heartbeat', os.getpid(), datetime.now())
return "Hello!"
Then I subclassed the Scheduler and SchedulerEntry classes.
class CouchBaseScheduler(Scheduler):
UPDATE_INTERVAL = datetime.timedelta(seconds=5)
Entry = CouchBaseScheduleEntry
host = "192.168.59.103"
port = "8091"
bucket = "celery"
doc_string = "scheduler_list"
password = "1234"
scheduleCount = 0
def __init__(self, *args, **kwargs):
if hasattr(current_app.conf, "CELERY_COUCHBASE_SCHEDULER_BUCKET"):
bucket_str = current_app.conf.CELERY_COUCHBASE_SCHEDULER_BUCKET
else:
bucket_str = "celery"
if hasattr(current_app.conf, "CELERY_COUCHBASE_SCHEDULER_URL"):
cnx_string = "{}/{}".format(current_app.conf.CELERY_COUCHBASE_SCHEDULER_URL, bucket_str)
else:
cnx_string = "couchbase://{}:{}/{}".format(self.host, self.port, self.bucket)
try:
self.bucket = Bucket(cnx_string, password=self.password, quiet=True)
self.couchcel = CouchBaseCelery(self.bucket, self.doc_string)
get_logger(__name__).info("backend scheduler using %s", cnx_string)
self._schedule = {}
self._last_updated = None
Scheduler.__init__(self, *args, **kwargs)
self.max_interval = (kwargs.get('max_interval')
or self.app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or 5)
except AuthError:
get_logger(__name__).error("Couchbase connection %s failed : Auth failed!", cnx_string)
except CouchbaseError as cbe:
get_logger(__name__).debug("Couchbase connection %s failed : %s", cnx_string, type(cbe))
def setup_schedule(self):
pass
def requires_update(self):
if not self._last_updated:
return True
return self._last_updated + self.UPDATE_INTERVAL < datetime.datetime.now()
def get_from_database(self):
self.sync()
try:
get_logger(__name__).info("Getting scheduler list from couchbase.")
couch_scheduler_list = self.couchcel.get_scheduler_list()
return couch_scheduler_list
except Exception as e:
get_logger(__name__).error("Could not get scheduler list from couchbase: {}".format(e))
@property
def schedule(self):
# self.scheduleCount += 1
# get_logger(__name__).info("Scheduling {}".format(self.scheduleCount))
if self.requires_update():
get_logger(__name__).info("Schedule {} requires update".format(self.scheduleCount))
self._schedule = self.get_from_database()
self._last_updated = datetime.datetime.now()
return self.schedule
def sync(self):
for entry in self._schedule.values():
entry.save(self.couchcel)
and
class CouchBaseScheduleEntry(ScheduleEntry):
def __init__(self, taskid, task):
self._task = task
self.app = current_app._get_current_object()
self._id = taskid
get_logger(__name__).info("Task id: {} processing".format(self._id))
try:
if all(k in self._task for k in ('name', 'task', 'enabled')):
self.name = self._task['name']
self.task = self._task['task']
else:
raise Exception("Field name, task or enabled are mandatory!")
self.args = self._task['args']
self.kwargs = self._task['kwargs']
self.options = self._task['options']
if 'interval' in self._task and 'crontab' in self._task:
raise Exception("Cannot define both interval and crontab schedule")
if 'interval' in self._task:
interval = self._task['interval']
if interval['period'] in PERIODS:
self.schedule = self._interval_schedule(interval['period'], interval['every'])
get_logger(__name__).info("Task contains interval")
else:
raise Exception("The value of an interval must be {}".format(PERIODS))
elif 'crontab' in self._task:
crontab = self._task['crontab']
self.schedule = self._crontab_schedule(crontab)
get_logger(__name__).info("Task contains crontab")
else:
raise Exception("You must define interval or crontab schedule")
if self._task['total_run_count'] is None:
self._task['total_run_count'] = 0
self.total_run_count = self._task['total_run_count']
get_logger(__name__).info("Task total run count: {}".format(self.total_run_count))
if not self._task['last_run_at']:
self._task['last_run_at'] = self._default_now()
else:
self._task['last_run_at'] = datetime.datetime.strptime(self._task['last_run_at'], DATEFORMAT)
self.last_run_at = self._task['last_run_at']
get_logger(__name__).info("Task last run at: {}".format(self.last_run_at))
except KeyError as ke:
print('Key not valid: {}'.format(ke))
def _default_now(self):
return self.app.now()
def next(self):
self._task['last_run_at'] = self.app.now()
self._task['total_run_count'] += 1
self._task['run_immediately'] = False
get_logger(__name__).info("NEXT!")
return self.__class__(self._task)
__next__ = next
def is_due(self):
if not self._task['enabled']:
return False, 5.0 # 5 secs delay for reenable
if self._task['run_immediately']:
# figure out when the schedule would run next anyway
_, n = self.schedule.is_due(self.last_run_at)
return True, n
return self.schedule.is_due(self.last_run_at)
def _crontab_schedule(self, crontab):
return celery.schedules.schedule(minute=crontab['minute'],
hour=crontab['hour'],
day_of_week=crontab['day_of_week'],
day_of_month=crontab['day_of_month'],
month_of_year=crontab['month_of_year'])
def _interval_schedule(self, period, every):
return celery.schedules.schedule(datetime.timedelta(**{period: every}))
def __repr__(self):
return '<CouchBaseScheduleEntry ({0} {1}(*{2}, **{3}) {{4}})>'.format(
self.name, self.task, self.args,
self.kwargs, self.schedule
)
def reserve(self, entry):
new_entry = Scheduler.reserve(self, entry)
return new_entry
@property
def getid(self):
return self._id
@property
def gettaskdict(self):
return self._task
def tojson(self):
return json.dumps(self.tocouchdict())
def save(self, couchcel):
get_logger(__name__).info("Saving task {} in couchbase".format(self._id))
if self.total_run_count > self._task['total_run_count']:
self._task['total_run_count'] = self.total_run_count
get_logger(__name__).error("{}, {}".format(self.last_run_at, self._task['last_run_at']))
try:
if self.last_run_at and self._task['last_run_at'] \
and self.last_run_at > self._task['last_run_at']:
self._task['last_run_at'] = self.last_run_at
except TypeError:
if self.last_run_at and self._task['last_run_at'] \
and self.last_run_at > datetime.datetime.strptime(self._task['last_run_at'], DATEFORMAT):
self._task['last_run_at'] = self.last_run_at
self._task['run_immediately']= False
couchcel.save_scheduler(self)
The couchcel object is used for database access, the ScheduleEntry object parses the datas coming from the couchbase document.
Best regards
The documentation on the Celery website seems to be a bit misleading. If you look here, you can see that the command line option -S sets the state database, not the scheduler for the worker.
Try running this with the --scheduler option instead:
celery -A <my.task.file> beat --scheduler <my.scheduler.CouchBaseScheduler>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With