I want to use execution_date_fn in External task sensor in my dag to make it dependent on previous instance(I do not want to use depends_on_past parameter). Can someone please tell me how can I get previous execution id of the same dag using 'execution_date_fn', so that I do not have to specify hours/minutes in 'execution_delta'. Not sure if below method 'prev_execution_date_1' is correct. Appreciate any help on this.
def prev_execution_date_1(**kwargs):
dr = self.get_dagrun(session=session)
previous_scheduled_date = dr.previous_schedule(self.execution_date)
return previous_scheduled_date
external_0 = ExternalTaskSensor(
task_id='Check_Previous_Instance',
external_task_id=None,
external_dag_id='dag_abc_1',
allowed_states=['success'],
execution_date_fn=prev_execution_date_1,
dag=dag
)
I got the answer, thought of posting it here, it may help someone.
Below method will return execution id based on if condition I mentioned. In this method, you can see it will return 2 days before execution id if it satisfies the condition, else will return yesterday's execution id.
def prev_execution_dt(execution_date, **kwargs):
weekday=execution_date.strftime('%A')
print(weekday)
if weekday == "Thursday":
execution_dt_derived=execution_date - timedelta(hours=72)
print(execution_dt_derived)
else:
execution_dt_derived=execution_date - timedelta(hours=24)
print(execution_dt_derived)
return execution_dt_derived
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With