I am trying to execute a task after 5 minutes from the parent task inside a DAG.
DAG : Task 1 ----> Wait for 5 minutes ----> Task 2
How can I achieve this in Apache Airflow? Thanks in advance.
Timeouts. If you want a task to have a maximum runtime, set its execution_timeout attribute to a datetime. timedelta value that is the maximum permissible runtime. This applies to all Airflow tasks, including sensors.
You can have the Airflow Scheduler be responsible for starting the process that turns the Python files contained in the DAGs folder into DAG objects that contain tasks to be scheduled.
retries (int) – the number of retries that should be performed before failing the task. retry_delay (datetime.timedelta) – delay between retries. retry_exponential_backoff (bool) – allow progressive longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)
The said behaviour can be achieved by introducing a task that forces a delay of specified duration between your Task 1 and Task 2
This can be achieved using PythonOperator
import time
from airflow.operators.python import PythonOperator
delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
                                                   dag=my_dag,
                                                   python_callable=lambda: time.sleep(300))
task_1 >> delay_python_task >> task_2
Or using BashOperator as well
from airflow.operators.bash import BashOperator
delay_bash_task: BashOperator = BashOperator(task_id="delay_bash_task",
                                             dag=my_dag,
                                             bash_command="sleep 5m")
task_1 >> delay_bash_task >> task_2
Note: The given code-snippets are NOT tested
References
example_python_operator.pyexample_bash_operator.pyUPDATE-1
Here are some other ways of introducing delay
on_success_callback / on_failure_callback: Depending of whether Task 2 is supposed to run upon success or failure of Task 1, you can pass lambda: time.sleep(300) in either of these params of Task 1.pre_execute() / post_execute(): Invoking time.sleep(300) in Task 1's post_execute() or Task 2's pre_execute() would also have the same effect. Of course this would involve modifying code for your tasks (1 or 2) so better avoid itPersonally I would prefer the extra task approach because it makes things more explicit and doesn't falsely exaggerate the runtime of your Task 1 or Task 2
@y2k-shubham gave the best answer to date, however, I want to warn not to use the callback solution. as it first marks the task as success and then executes the callback. which means task2 will not see any delay. if you don't want to use a separate task, you can use something like this:
< ... >
task1 = DummyOperator(task_id='task1', dag=dag)
task1.post_execute = lambda **x: time.sleep(300)
task2 = DummyOperator(task_id'task2', dag=dag)
task1 >> task2
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With