Let's take an example DAG.

Here is the code for it.
import logging
from airflow import DAG
from datetime import datetime, timedelta
from airflow.models import TaskInstance
from airflow.operators.python import PythonOperator
from airflow.operators.dummy import DummyOperator
def task_failure_notification_alert(context):
logging.info("Task context details: %s", str(context))
def dag_failure_notification_alert(context):
logging.info("DAG context details: %s", str(context))
def red_exception_task(ti: TaskInstance, **kwargs):
raise Exception('red')
default_args = {
"owner": "analytics",
"start_date": datetime(2021, 12, 12),
'retries': 0,
'retry_delay': timedelta(),
"schedule_interval": "@daily"
}
dag = DAG('logger_dag',
default_args=default_args,
catchup=False,
on_failure_callback=dag_failure_notification_alert
)
start_task = DummyOperator(task_id="start_task", dag=dag, on_failure_callback=task_failure_notification_alert)
red_task = PythonOperator(
dag=dag,
task_id='red_task',
python_callable=red_exception_task,
provide_context=True,
on_failure_callback=task_failure_notification_alert
)
end_task = DummyOperator(task_id="end_task", dag=dag, on_failure_callback=task_failure_notification_alert)
start_task >> red_task >> end_task
We can see two functions i.e. task_failure_notification_alert and dag_failure_notification_alert are being called in case of failures.
We can see logs in case of Task failure by the below steps.
We can see logs for the task as below.


but I am unable to find logs for the on_failure_callback of DAG anywhere in UI. Where can we see it?
Under airflow/logs find the "scheduler" folder, under it look for the specific date you ran the Dag for example 2022-12-03 and there you will see name of the dag_file.log.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With