I have a simple dag that uses a branch operator to check if y is False. If it is, the dag is supposed to move on to the say_goodbye task group. If True, it skips and goes to finish_dag_step. Here's the dag:
def which_step() -> str:
y = False
if not y:
return 'say_goodbye'
else:
return 'finish_dag_step'
with DAG(
'my_test_dag',
start_date = datetime(2022, 5, 14),
schedule_interval = '0 0 * * *',
catchup = True) as dag:
say_hello = BashOperator(
task_id = 'say_hello',
retries = 3,
bash_command = 'echo "hello world"'
)
run_which_step = BranchPythonOperator(
task_id = 'run_which_step',
python_callable = which_step,
retries = 3,
retry_exponential_backoff = True,
retry_delay = timedelta(seconds = 5)
)
with TaskGroup('say_goodbye') as say_goodbye:
for i in range(0,2):
step = BashOperator(
task_id = 'step_' + str(i),
retries = 3,
bash_command = 'echo "goodbye world"'
)
step
finish_dag_step = BashOperator(
task_id = 'finish_dag_step',
retries = 3,
bash_command = 'echo "dag is finished"'
)
say_hello >> run_which_step
run_which_step >> say_goodbye >> finish_dag_step
run_which_step >> finish_dag_step
finish_dag_step
I get the following errors when the dag hits run_which_step:
I don't understand what's causing this. What is going on?
You can't create task dependencies to a TaskGroup. Therefore, you have to refer to the tasks by task_id
, which is the TaskGroup's name and the task's id joined by a dot (task_group.task_id
).
Your branching function should return something like
def branch():
if condition:
return [f'task_group.task_{i}' for i in range(0,2)]
return 'default'
But instead of returning a list of task ids in such way, probably the easiest is to just put a DummyOperator upstream of the TaskGroup. It'd effectively act as an entrypoint to the whole group.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With