I'm trying to deploy a flask app on heroku that uses background tasks in Celery. I've implemented the application factory pattern so that the celery processes are not bound to any one instance of the flask app.
This works locally, and I have yet to see an error. But when deployed to heroku, the same results always occur: the celery task (I'm only using one) succeeds the first time it is run, but any subsequent celery calls to that task fail with sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) SSL error: decryption failed or bad record mac
. If I restart the celery worker, the cycle continues.
There are multiple issues that show this same error, but none specify a proper solution. I initially believed implementing the application factory pattern would have prevented this error from manifesting, but it's not quite there.
In app/__init__.py
I create the celery and db objects:
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)
db = SQLAlchemy()
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
db.init_app(app)
return app
My flask_celery.py
file creates the actual Flask app object:
import os
from app import celery, create_app
app = create_app(os.getenv('FLASK_CONFIG', 'default'))
app.app_context().push()
And I start celery with this command:
celery worker -A app.flask_celery.celery --loglevel=info
This is what the actual celery task looks like:
@celery.task()
def task_process_stuff(stuff_id):
stuff = Stuff.query.get(stuff_id)
stuff.processed = True
db.session.add(stuff)
db.session.commit()
return stuff
Which is invoked by:
task_process_stuff.apply_async(args=[stuff.id], countdown=10)
Library Versions
The solution was to add db.engine.dispose()
at the beginning of the task, disposing of all db connections before any work begins:
@celery.task()
def task_process_stuff(stuff_id):
db.engine.dispose()
stuff = Stuff.query.get(stuff_id)
stuff.processed = True
db.session.commit()
return stuff
As I need this functionality across all of my tasks, I added it to task_prerun
:
@task_prerun.connect
def on_task_init(*args, **kwargs):
db.engine.dispose()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With