Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I add a delay to a schedule in airflow?

Tags:

airflow

I have a pipeline I want to run everyday, but I would like the execution date to lag. That is, on day X I want the execution date to be X-3. Is something like that possible?

like image 820
Tomas Jansson Avatar asked Dec 07 '25 18:12

Tomas Jansson


2 Answers

You can use a TimeSensor to delay the execution of tasks in a DAG. I don't think you can change the actual execution_date unless you can describe the behavior as a cron.

If you want this to only apply this delay for a subset of scheduled DAG runs, you could use a BranchPythonOperator to first check if execution_date is one of those days you want the lag. If it is, then take the branch with the sensor. Otherwise, move along without it.

Alternatively, especially if you plan to have this behavior in more than one DAG, you can write a modified version of the sensor. It might look something like this:

def poke(self, context):
    if should_delay(context['execution_date']):
        self.log.info('Checking if the time (%s) has come', self.target_time)
        return timezone.utcnow().time() > self.target_time
    else:
        self.log.info('Not one of those days, just run')
        return True

You can reference the code for the existing time sensor in https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/sensors/time_sensor.py#L38-L40.

like image 129
Daniel Huang Avatar answered Dec 11 '25 14:12

Daniel Huang


It looks like you are using execution_date as a variable in your pipeline logic. For example, to process the data that is 3 days older than the execution_date. So, instead of making execution_date to lag by 3 days you can subtract the lag from execution_date and use the result in you pipeline logic. Airflow provides a number of ways to do it:

  1. Templates: {{ execution_date - macros.timedelta(days=3) }}. So, for example, the bash_command parameter of BashOperator can be bash_command='echo Processing date: {{ execution_date - macros.timedelta(days=3) }} '
  2. The PythonOperator's python callable: Define the callable something like def func(execution_date, **kwargs): ... and set the PythonOperator's parameter provide_context=True. The execution_date parameter of func() will be set to the current execution date (datetime object) on call. So, inside func() you can do processing_date = execution_date - timedelta(days=3).
  3. The Sensors' context parameter: The poke() and execute() methods of any sensor have the context paramter that is a dict with all macros including execution_date. So, in these methods you can do processing_date = context['execution_date'] - timedelta(days=3).

Forcing execution date to have a lag simply does not feel right. Because, according to the Airflow's logic, the execution date of the currently running DAG normally can have lag only if it is catching up (bakcfilling).

like image 42
SergiyKolesnikov Avatar answered Dec 11 '25 14:12

SergiyKolesnikov



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!