I'm trying to setup an Airflow DAG that provides default values available from dag_run.conf
. This works great when running the DAG from the webUI, using the "Run w/ Config" option. However when running on the schedule, the dag_run.conf
dict is not present, and the task will fail, e.g.
jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1'
Below is an example job.
Is it possible to make it so that dag_run.conf
always contains the dict defined by params
here?
from airflow import DAG
from airflow.utils.dates import hours_ago
from airflow.operators.bash import BashOperator
from datetime import timedelta
def do_something(val1: str, val2: str) -> str:
return f'echo vars are: "{val1}, {val2}"'
params = {
'key1': 'def1',
'key2': 'def2',
}
default_args = {
'retries': 0,
}
with DAG(
'template_test',
default_args=default_args,
schedule_interval=timedelta(minutes=1),
start_date=hours_ago(1),
params = params,
) as dag:
hello_t = BashOperator(
task_id='example-command',
bash_command=do_something('{{dag_run.conf["key1"]}}', '{{dag_run.conf["key2"]}}'),
config=params,
)
The closest I've seen is in For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI?, however there they leverage Jinja and if/else - which would require defining these default parameters twice. I'd like to define them only once.
You could use DAG params to achieve what you are looking for:
params (dict) – a dictionary of DAG level parameters that are made accessible in templates, namespaced under params. These params can be overridden at the task level.
You can define params
at DAG or Task levels and also add or modify them from the UI in the Trigger DAG w/ config section.
Example DAG:
default_args = {
"owner": "airflow",
}
dag = DAG(
dag_id="example_dag_params",
default_args=default_args,
schedule_interval="*/5 * * * *",
start_date=days_ago(1),
params={"param1": "first_param"},
catchup=False,
)
with dag:
bash_task = BashOperator(
task_id="bash_task", bash_command="echo bash_task: {{ params.param1 }}"
)
Output log:
[2021-10-02 20:23:25,808] {logging_mixin.py:104} INFO - Running <TaskInstance: example_dag_params.bash_task 2021-10-02T23:15:00+00:00 [running]> on host worker_01
[2021-10-02 20:23:25,867] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_dag_params
AIRFLOW_CTX_TASK_ID=bash_task
AIRFLOW_CTX_EXECUTION_DATE=2021-10-02T23:15:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-10-02T23:15:00+00:00
[2021-10-02 20:23:25,870] {subprocess.py:52} INFO - Tmp dir root location:
/tmp
[2021-10-02 20:23:25,871] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo bash_task: first_param']
[2021-10-02 20:23:25,884] {subprocess.py:74} INFO - Output:
[2021-10-02 20:23:25,886] {subprocess.py:78} INFO - bash_task: first_param
[2021-10-02 20:23:25,887] {subprocess.py:82} INFO - Command exited with return code 0
From the logs, notice that the dag_run
is scheduled and the params are still there.
You can find a more extensive example on using parameters in this answer.
Hope that works for you!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With