I am having a tough time in figuring out how to find the failed task for the same dag run running twice on same day(same execution day).
Consider an example when a dag with dag_id=1 has failed on the first run (due to any reason lets say connection timeout maybe) and task got failed. TaskInstance table will contain the entry of the failed task when we try to query it. GREAT!!
But, If I re-run the same dag(note that dag_id is still 1) then in the last task(it has the rule of ALL_DONE so irrespective of the whether upstream task was failed or was successful it will be executed) I want to calculate the number of tasks failed in the current dag_run ignoring the previous dag_runs. I came across dag_run id which could be useful if we can relate it to TaskInstance but I could not. Any suggestions/help is appreciated.
In Airflow 1.10.x the same result can be achieved by much simpler code that avoids touching ORM directly.
from airflow.utils.state import State
def your_python_operator_callable(**context):    
    tis_dagrun = context['ti'].get_dagrun().get_task_instances()
    failed_count = sum([True if ti.state == State.FAILED else False for ti in tis_dagrun])
    print(f"There are {failed_count} failed tasks in this execution"
The one unfortunate problem is that context['ti'].get_dagrun() does not return instance of DAGRun when running test of a single task from CLI. In the effect, manual testing of that single task will fail but the standard run will work as expected.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With