Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow ExternalTaskSensor with different scheduler interval

Currently I have two DAGs: DAG_A and DAG_B. Both runs with schedule_interval=timedelta(days=1)

DAG_A has a Task1 which usually takes 7 hours to run. And DAG_B only takes 3 hours.

DAG_B has a ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1") but also uses some other information X that is generated hourly.

What is the best way to increase the frequency of DAG_B so that it runs at least 4 times a day? As far as I know, both DAGs must have the same schedule_interval. However, I want to update X on DAG_B as much as I can.


One possibility is to create another DAG that has a ExternalTaskSensor for DAG_B. But I don't think it's the best way.

like image 220
Leonardo Farias Avatar asked Oct 25 '25 09:10

Leonardo Farias


2 Answers

If I understood you correctly, your conditions are:

  • Keep running DAG_A daily
  • Run DAG_B n times a day
  • Every time DAG_B runs it will wait for DAG_A__Task_1 to be completed

I think you could easily adapt your current design by instructing ExternalTaskSensor to wait for the desired execution date of DAG_A.

From the ExternalTaskSensor operator defnition:

Waits for a different DAG or a task in a different DAG to complete for a specific execution_date

That execution_date could be defined using execution_date_fn parameter:

execution_date_fn (Optional[Callable]) – function that receives the current execution date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.

You could define the sensor like this:

    wait_for_dag_a = ExternalTaskSensor(
        task_id='wait_for_dag_a',
        external_task_id="external_task_1",
        external_dag_id='dag_a_id',
        allowed_states=['success', 'failed'],
        execution_date_fn=_get_execution_date_of_dag_a,
        poke_interval=30
    )

Where _get_execution_date_of_dag_a performs a query to the DB using get_last_dagrun allowing you to get the last execution_date of DAG_A.

from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun

@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None,  **kwargs):
    dag_a_last_run = get_last_dagrun(
        'dag_a_id', session)
    return dag_a_last_run.execution_date

I hope this approach helps you out. You can find a working example in this answer.

like image 65
NicoE Avatar answered Oct 28 '25 04:10

NicoE


Combining @Gonza Piotti's comment with @NicoE's answer:

from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun

def _get_execution_date_of(dag_id):
    @provide_session
    def _get_last_execution_date(exec_date, session=None, **kwargs):
        dag_a_last_run = get_last_dagrun(dag_id, session)
        return dag_a_last_run.execution_date
    return _get_last_execution_date

we get a function that will yield another function which computes the last execution date of a given dag_id, use it like:

    wait_for_dag_a = ExternalTaskSensor(
        task_id='wait_for_dag_a',
        external_task_id='external_task_1',
        external_dag_id='dag_a',
        allowed_states=['success', 'failed'],
        execution_date_fn=_get_execution_date_of('dag_a'),
        poke_interval=30
    )
like image 38
Guilherme Z. Santos Avatar answered Oct 28 '25 04:10

Guilherme Z. Santos