Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow connection list check through python operator

Tags:

airflow

Before executing the DAG, I want to check whether a particular connection id is present in the connection list or not. I dont have any mechanismn of retaining a connection. Even if I create a connection through GUI, when server reboots all the connections gets removed.

Following is the task I thought I should add but thenI got an ascii error when I ran it, may be because the command return a table that might not be adequately parsed by the logger.

def create_connection(**kwargs):
    print(kwargs.get('ds'))

list_conn = BashOperator(
    task_id='list_connections',
    bash_command='airflow connections --l',
    xcom_push=True)

conns = list_conn.execute(context=kwargs)
logging.info(conns)

if not conns:
    new_conn = Connection(conn_id='xyz', conn_type='s3',
                          host='https://api.example.com')
    session = settings.Session()
    session.add(new_conn)
    session.commit()
    logging.info('Connection is created')

Question: Is there any way I would get to know in Airflow DAG itself that the connection is added or not. If its already there then I would not create a new connection.

like image 472
Gagan Avatar asked Oct 23 '25 14:10

Gagan


2 Answers

session.query(Connection) should do the trick.

def list_connections(**context):
    session = settings.Session()
    return session.query(Connection)

list_conn = PythonOperator(
    task_id='list_connections',
    python_callable=list_connections,
    provide_context=True,
)

Please make sure all the code is contained within tasks. Or to phrase it correctly, they should execute during run time instead of load time. Adding the code directly in DAG file cause it to run during load time which is not recommended.

like image 100
nightgaunt Avatar answered Oct 26 '25 11:10

nightgaunt


The accepted answers work perfectly. I had a scenario where I needed to get a connection by connection id to create the DAG. So I had to get it outside the task and in the DAG creation itself. The following code worked for me:

from airflow.hooks.base_hook import BaseHook

conn = BaseHook.get_connection(connection)

Hope this might help someone! :)

like image 35
Anirudh Bagri Avatar answered Oct 26 '25 11:10

Anirudh Bagri



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!