When generating tasks dynamically, I need to have Task 2 be dependent of Task 1, Task1 >> Task 2 or task2.set_upstream(task1).
Since the task_ids are evaluated, or seem to be upfront, I cannot set the dependency in advance, any help would be appreciated.
The Component(I) tasks generate fine, except that they all run at once.
for i in range(1,10):
  task_id='Component'+str(i)
  task_id = BashOperator(
  task_id='Component'+str(i),
  bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
  xcom_push=True,
  dag=dag) 
  ?????.set_upstream(??????)
Airflow's dynamic task mapping feature is built off of the MapReduce programming model. The map procedure takes a set of inputs and creates a single task for each one. The reduce procedure, which is optional, allows a task to operate on the collected output of a mapped task.
According to the official Airflow docs, The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task).
You can run a task independently by using -i/-I/-A flags along with the run command. But yes the design of airflow does not permit running a specific task and all its dependencies.
Use the following code:
a = []
for i in range(0,10):
    a.append(BashOperator(
        task_id='Component'+str(i),
        bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
        xcom_push=True,
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]
Using a DummyOperator, the codes looks like:
a = []
for i in range(0,10):
    a.append(DummyOperator(
        task_id='Component'+str(i),
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]
This would generate the following DAG:

You can follow a pattern like this:
with dag:
d1 = DummyOperator(task_id='kick_off_dag')
for i in range(0, 5):
    d2 = DummyOperator(task_id='generate_data_{0}'.format(i))
    d1 >> d2
This will generate 5 tasks downstream from d1.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With