Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Airflow move from branch to another branch

Tags:

airflow

I faced some problems with moving one branch to another in Apache Airflow I have a DAG that depends on three Branch operators

all_empty_branch_task >> generate_round_task >> load_tasks
all_empty_branch_task >> resolving_branch_task
resolving_branch_task >> [
        export_final_annotation_task, annotation_branch_task, cleansing_branch_task]

Dag in airflow UI

I confirmed that the resolving_branch_task(check-resolving-branch) python function returns the annotation_branch_task(check-annotation-branch) task_id which is also a python branch, but after resolving_branch_task ends execution, it just skipped everything. I'm not sure what is wrong with it. Notably, when I return a normal task_id, not a branch it executes the task successfully. could anyone help, please, I would appreciate it.

like image 941
samyouaret Avatar asked Sep 06 '25 21:09

samyouaret


1 Answers

BranchPythonOperator tasks will skip all tasks in an entire "branch" that is not returned by its python_callable. This means that when the "check-resolving-branch" doesn't choose the "export-final-annotation-task" it will be skipped and its downstream tasks which includes the "check-annotation-branch" task and all of the other tasks in the DAG.

To help this you can use Trigger Rules in Airflow. By default, the Trigger Rule for all tasks is "all_success". In this use case, you can set the "check-annotation-branch" task's Trigger Rule to "all_done" which will allow this task to execute once all upstream tasks are finished (aka succeeded, failed, skipped).

Here is an example that should give you an idea of what to implement in your DAG:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
    dag_id="branch_test",
    start_date=datetime(2021, 9, 10),
    schedule_interval=None,
) as dag:

    @task
    def func1():
        ...

    @task
    def func2():
        ...

    @task
    def func3():
        ...

    @task
    def func4():
        ...

    branch_1 = BranchPythonOperator(task_id="branch_1", python_callable=lambda: "branch_2")
    branch_2 = BranchPythonOperator(
        task_id="branch_2", python_callable=lambda: "func3", trigger_rule=TriggerRule.ALL_DONE
    )

    func1() >> branch_1 >> func2() >> branch_2
    branch_1 >> branch_2 >> [func3(), func4()]

enter image description here

like image 187
Josh Fell Avatar answered Sep 11 '25 00:09

Josh Fell