I want a whole task group to run on the output of a single task, where both task and task group are defined via decorators - @task and @task_group respectively.
Somewhat similar to

For that, I updated one of the examples provided by Airflow. The get's output is fixed, but in reality it varies:
import json
from datetime import datetime
from airflow.decorators import dag, task, task_group
default_args = {
'start_date': datetime(2021, 1, 1)
}
@dag(dag_id='xcom_taskflow_dag', schedule_interval='@daily', default_args=default_args, catchup=False)
def taskflow():
@task()
def get():
return {"data":[('a', 'A'), ('b', 'B')]}
@task_group(group_id='group')
def group(data: dict):
tasks = []
for i, d in enumerate(data):
@task(task_id=f'subtask_{i}')
def unitask(d):
return {"result": d}
task_result = unitask(d)
tasks.append(task_result)
return tasks
group(get())
dag = taskflow()
The error I get is:
TypeError: 'XComArg' object is not iterable
Debugging the variable data variable, I see that it is a string:
{{ task_instance.xcom_pull(task_ids='get', dag_id='xcom_taskflow_dag', key='return_value') }}
Is there a reasonable way to render data so as to be able to
access it as a get's output instance?
Is any architectural principle being violated?
What would be an alternative way to achieve the final goal?
You are trying to create tasks dynamically based on the result of the task get, this result is only available at runtime. Instead, you can use the new concept Dynamic Task Mapping to create multiple task at runtime. You can do that with or without task_group, but if you want the task_group just to group these tasks, it will be useless because they are already grouped in one task on the UI:
import json
from datetime import datetime
from airflow.decorators import dag, task, task_group
default_args = {
'start_date': datetime(2021, 1, 1)
}
@dag(dag_id='xcom_taskflow_dag', schedule_interval='@daily', default_args=default_args, catchup=False)
def taskflow():
@task()
def get():
return [('a', 'A'), ('b', 'B')]
@task_group(group_id='group')
def group(data):
@task(task_id=f'subtask')
def unitask(t):
return {"result": t[1]}
tasks_result = unitask.expand(t=data)
return tasks_result
group(get())
dag = taskflow()

And if you have a dict format for the data in the method get and you don't want to change them in this task, you can create an intermediate task which read the get result and return a list, then use the result of the new task as input for your group.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With