Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flask and Celery on Heroku: sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) SSL error: decryption failed or bad record mac

I'm trying to deploy a flask app on heroku that uses background tasks in Celery. I've implemented the application factory pattern so that the celery processes are not bound to any one instance of the flask app.

This works locally, and I have yet to see an error. But when deployed to heroku, the same results always occur: the celery task (I'm only using one) succeeds the first time it is run, but any subsequent celery calls to that task fail with sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) SSL error: decryption failed or bad record mac. If I restart the celery worker, the cycle continues.

There are multiple issues that show this same error, but none specify a proper solution. I initially believed implementing the application factory pattern would have prevented this error from manifesting, but it's not quite there.

In app/__init__.py I create the celery and db objects:

celery = Celery(__name__, broker=Config.CELERY_BROKER_URL)
db = SQLAlchemy()

def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])

    db.init_app(app)
    return app

My flask_celery.py file creates the actual Flask app object:

import os
from app import celery, create_app

app = create_app(os.getenv('FLASK_CONFIG', 'default'))
app.app_context().push()

And I start celery with this command: celery worker -A app.flask_celery.celery --loglevel=info

This is what the actual celery task looks like:

@celery.task()
def task_process_stuff(stuff_id):
    stuff = Stuff.query.get(stuff_id)
    stuff.processed = True
    db.session.add(stuff)
    db.session.commit()
    return stuff

Which is invoked by:

task_process_stuff.apply_async(args=[stuff.id], countdown=10)

Library Versions

  • Flask 0.12.2
  • SQLAlchemy 1.1.11
  • Flask-SQLAlchemy 2.2
  • Celery 4.0.2
like image 807
Anconia Avatar asked Oct 20 '25 15:10

Anconia


1 Answers

The solution was to add db.engine.dispose() at the beginning of the task, disposing of all db connections before any work begins:

@celery.task()
def task_process_stuff(stuff_id):
    db.engine.dispose()
    stuff = Stuff.query.get(stuff_id)
    stuff.processed = True
    db.session.commit()
    return stuff

As I need this functionality across all of my tasks, I added it to task_prerun:

@task_prerun.connect
def on_task_init(*args, **kwargs):
    db.engine.dispose()
like image 181
Anconia Avatar answered Oct 22 '25 05:10

Anconia



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!