Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

clear an upstream task in airflow within the dag

i have a task in an airflow DAG. it has three child tasks. unfortunately, there are cases where this parent task will succeed, but two of the three children will fail (and a retry on the children won't fix them).

it requires the parent to retry (even though it didn't fail).

so i dutifully go into the graph view of the dag run and 'clear' this parent task and all downstream tasks (+recursive).

is there a way i can do this within the dag itself?

like image 385
yee379 Avatar asked Jan 21 '26 15:01

yee379


2 Answers

If your tasks are part of a subdag, calling dag.clear() in the on_retry_callback of a SubDagOperator should do the trick:

SubDagOperator(
    subdag=subdag,
    task_id="...",
    on_retry_callback=lambda context: subdag.clear(
        start_date=context['execution_date'],
        end_date=context['execution_date']),
    dag=dag
)
like image 199
Christoph Hösler Avatar answered Jan 23 '26 21:01

Christoph Hösler


We opted for using the clear_task_instances method of the taskinstance:

@provide_session
def clear_tasks_fn(tis,session=None,activate_dag_runs=False,dag=None) -> None:
     """
     Wrapper for `clear_task_instances` to be used in callback function
     (that accepts only `context`)
    """

    taskinstance.clear_task_instances(tis=tis,
                                      session=session,
                                      activate_dag_runs=activate_dag_runs,
                                      dag=dag)

def clear_tasks_callback(context) -> None:
    """
    Clears tasks based on list passed as `task_ids_to_clear` parameter

    To be used as `on_retry_callback`
    """

    all_tasks = context["dag_run"].get_task_instances()
    dag = context["dag"]
    task_ids_to_clear = context["params"].get("task_ids_to_clear", [])

    tasks_to_clear = [ ti for ti in all_tasks if ti.task_id in task_ids_to_clear ]

    clear_tasks_fn(tasks_to_clear,
               dag=dag)

You would need to provide the list of tasks you want cleared on the callback, e.g on any child task:

DummyOperator('some_child',
              on_retry_callback=clear_tasks_callback,
              params=dict(
                  task_ids_to_clear=['some_child', 'parent']
              ),
              retries=1
)
like image 23
Yoni M Avatar answered Jan 23 '26 19:01

Yoni M