Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Task after BranchPythonOperator Task getting skipped

Tags:

airflow

I created a BranchPythonOperator which calls 2 tasks depending on the condition like:

typicon_check_table = BranchPythonOperator(
    task_id='typicon_check_table',
    python_callable=CheckTable(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_create_table = PythonOperator(
    task_id='typicon_create_table',
    python_callable=CreateTable(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_load_data = PythonOperator(
    task_id='typicon_load_data',
    python_callable=LoadData(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_check_table.set_downstream([typicon_load_data, typicon_create_table])
typicon_create_table.set_downstream(typicon_load_data)

This is the CheckTable callable class:

class CheckTable:
    """
    DAG task to check if table exists or not.
    """

    def __call__(self, **kwargs) -> None:
        pg_hook = PostgresHook(postgres_conn_id="postgres_docker")
        query = "SELECT EXISTS ( \
            SELECT 1 FROM information_schema.tables \
            WHERE table_schema = 'public' \
            AND table_name = 'users');"

        table_exists = pg_hook.get_records(query)[0][0]
        if table_exists:
            return "typicon_load_data"
        return "typicon_create_table"

The issue is both the tasks are getting skipped when the typicon_check_table task is run.

How to fix this issue?

enter image description here

like image 632
Manish Gupta Avatar asked Mar 22 '26 02:03

Manish Gupta


2 Answers

I have worked out with same scenario , its working fine with me for below code

BranchPythonOperator(task_id='slot_population_on_is_y_or_n', python_callable=DAGConditionalValidation('Y'),
                         trigger_rule='one_success')
slot_population_on_is_y = DummyOperator(task_id='slot_population_on_is_y')
slot_population_on_is_n = DummyOperator(task_id='slot_population_on_is_n')
slot_population_on_is_y_or_n >> [slot_population_on_is_y, slot_population_on_is_n]


class DAGConditionalValidation:

    def __init__(self, conditional_param_key):
        self.conditional_param_key = conditional_param_key


    def __call__(self, **kwargs):
        if (conditional_param_key == 'Y'):
            return slot_population_on_is_y
        return slot_population_on_is_n

It looks all your code fine, but you're missing the trigger rule, please set trigger rule as trigger_rule='one_success'.
This should work for you as well.

like image 129
Saleem Shaiks Avatar answered Mar 23 '26 14:03

Saleem Shaiks


The task typicon_load_data has typicon_create_table as a parent and the default trigger_rule is all_success, so I am not surprised by this behaviour.

Two possible cases here:

  1. CheckTable() returns typicon_load_data, then typicon_create_table is skipped, but typicon_load_data being downstream is also skipped.
  2. CheckTable() returns typicon_create_table, that's executed and it triggers typicon_load_data which is skipped because it was the excluded branch.

I assume your screenshot is from case 1.?

like image 28
Alessandro Cosentino Avatar answered Mar 23 '26 16:03

Alessandro Cosentino



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!