I have a pipeline I want to run everyday, but I would like the execution date to lag. That is, on day X I want the execution date to be X-3. Is something like that possible?
You can use a TimeSensor to delay the execution of tasks in a DAG. I don't think you can change the actual execution_date unless you can describe the behavior as a cron.
If you want this to only apply this delay for a subset of scheduled DAG runs, you could use a BranchPythonOperator to first check if execution_date is one of those days you want the lag. If it is, then take the branch with the sensor. Otherwise, move along without it.
Alternatively, especially if you plan to have this behavior in more than one DAG, you can write a modified version of the sensor. It might look something like this:
def poke(self, context):
if should_delay(context['execution_date']):
self.log.info('Checking if the time (%s) has come', self.target_time)
return timezone.utcnow().time() > self.target_time
else:
self.log.info('Not one of those days, just run')
return True
You can reference the code for the existing time sensor in https://github.com/apache/incubator-airflow/blob/1.10.1/airflow/sensors/time_sensor.py#L38-L40.
It looks like you are using execution_date as a variable in your pipeline logic. For example, to process the data that is 3 days older than the execution_date. So, instead of making execution_date to lag by 3 days you can subtract the lag from execution_date and use the result in you pipeline logic. Airflow provides a number of ways to do it:
{{ execution_date - macros.timedelta(days=3) }}. So, for example, the bash_command parameter of BashOperator can be bash_command='echo Processing date: {{ execution_date - macros.timedelta(days=3) }} 'def func(execution_date, **kwargs): ... and set the PythonOperator's parameter provide_context=True. The execution_date parameter of func() will be set to the current execution date (datetime object) on call. So, inside func() you can do processing_date = execution_date - timedelta(days=3).context parameter: The poke() and execute() methods of any sensor have the context paramter that is a dict with all macros including execution_date. So, in these methods you can do processing_date = context['execution_date'] - timedelta(days=3).Forcing execution date to have a lag simply does not feel right. Because, according to the Airflow's logic, the execution date of the currently running DAG normally can have lag only if it is catching up (bakcfilling).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With