As stated in Apache Airflow documentation, I can control how often a DAG is updated by setting configuration variable min_file_process_interval in your airflow.cfg file:
min_file_process_interval
Number of seconds after which a DAG file is parsed. The DAG file is parsed every min_file_process_interval number of seconds. Updates to DAGs are reflected after this interval. Keeping this number low will increase CPU usage.
However, I didn't find any clue or best practice about which value should I set for min_file_process_interval.
My DAG changes once a day. By default min_file_process_interval is set to 30 seconds. It means most of the time updating DAG is useless: as long as DAG doesn't change, updated DAG and previous DAG are the same. It consumes resources and generates logs. But if I update DAG only once a day, do I risk to run wrong DAG if DAG changes after the daily DAG update or the DAG is also updated just before run ?
What value for min_file_process_interval should I set in this case ?
EDIT: As stated in Elad's answer responding to a previous version of this question, dynamic DAGs should be avoided. However, If I have dynamic DAGs, how to choose min_file_process_interval?
You are mixing two different things.
min_file_process_interval means how often Airflow scan the .py files and update the DAG within Airflow. Consider that when you deploy new .py file Airflow needs to read it and create it in the metastore database - so the setting is about how often it happens.
For your use case the DAG code should not be updated every day - In fact it should not be updated at all. It should just run everyday. Your dag just need to be able to handle the correct file per date. You code can be something like:
from airflow.providers.ftp.sensors.ftp import FTPSensor
with DAG(dag_id='stackoverflow',
default_args=default_args,
schedule_interval="@daily",
catchup=False
) as dag:
# Waits for a file or directory to be present on FTP.
sensor_op = FTPSensor(
task_id='sensor_task',
path='/my_folder/{{ ds }}/file.csv', #path to your file in the server
fail_on_transient_errors=False,
ftp_conn_id='ftp_default'
)
# Operator to process the file
operator_op = SomeOperator()
sensor_op >> operator_op
In that DAG it will start a run everyday - the first operator is sensor thus if the file for that day isn't present the workflow will wait until it appears only once it appear the workflow will continue to the 2nd operator which should process the file.
Note that the path parameter of FTPSensor is templated. This means you can use macros like {{ ds }} this will give you a path that contains each day date like:
/my_folder/2021-05-01/file.csv
/my_folder/2021-05-02/file.csv
/my_folder/2021-05-03/file.csv
You can also do path='/my_folder/{{ ds }}.csv' which will give:
/my_folder/2021-05-01.csv
/my_folder/2021-05-02.csv
/my_folder/2021-05-03.csv
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With