Before executing the DAG, I want to check whether a particular connection id is present in the connection list or not. I dont have any mechanismn of retaining a connection. Even if I create a connection through GUI, when server reboots all the connections gets removed.
Following is the task I thought I should add but thenI got an ascii error when I ran it, may be because the command return a table that might not be adequately parsed by the logger.
def create_connection(**kwargs):
print(kwargs.get('ds'))
list_conn = BashOperator(
task_id='list_connections',
bash_command='airflow connections --l',
xcom_push=True)
conns = list_conn.execute(context=kwargs)
logging.info(conns)
if not conns:
new_conn = Connection(conn_id='xyz', conn_type='s3',
host='https://api.example.com')
session = settings.Session()
session.add(new_conn)
session.commit()
logging.info('Connection is created')
Question: Is there any way I would get to know in Airflow DAG itself that the connection is added or not. If its already there then I would not create a new connection.
session.query(Connection) should do the trick.
def list_connections(**context):
session = settings.Session()
return session.query(Connection)
list_conn = PythonOperator(
task_id='list_connections',
python_callable=list_connections,
provide_context=True,
)
Please make sure all the code is contained within tasks. Or to phrase it correctly, they should execute during run time instead of load time. Adding the code directly in DAG file cause it to run during load time which is not recommended.
The accepted answers work perfectly. I had a scenario where I needed to get a connection by connection id to create the DAG. So I had to get it outside the task and in the DAG creation itself. The following code worked for me:
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection(connection)
Hope this might help someone! :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With