I'm trying to integrate celery into my app but I am getting this error saying Received unregistered task of type "". The message has been ignored and discarded. My Celery app instance is being created like so:
from celery import Celery
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
My tasks file is like so:
from flask import current_app
from .. import celery
from ..models.models import MobileRedemption
@celery.task(name='process_new_redemption')
def task_process_new_redemption(red_id):
redemption = MobileRedemption.objects(id=red_id).first()
if redemption:
assert isinstance(redemption, MobileRedemption)
print ("Redemption Successful.....!")
@celery.task(name='process_delete_redemption')
def task_delete_redemption(red_id):
current_app.logger.info("reached here!")
redemption = MobileRedemption.objects(id=red_id).first()
print(redemption)
redemption.delete()
What am I doing wrong?
In your Celery construction you should include your tasks files:
celery = Celery(app.import_name,
broker=app.config['CELERY_BROKER_URL'],
include=['path.to.tasks'])
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With