I am new to Airflow.
I have come across a scenario, where Parent DAG need to pass some dynamic number (let's say n) to Sub DAG.
Where as SubDAG will use this number to dynamically create n parallel tasks. 
Airflow documentation doesn't cover a way to achieve this. So I have explore couple of ways :
I have tried to pass as a xcom value, but for some reason SubDAG is not resolving to the passed value.
Parent Dag File
def load_dag(**kwargs):
    number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs'])
    dag_data = json.dumps({
        "number_of_runs": number_of_runs
    })
    return dag_data
# ------------------ Tasks ------------------------------
load_config = PythonOperator(
    task_id='load_config',
    provide_context=True,
    python_callable=load_dag,
    dag=dag)
t1 = SubDagOperator(
    task_id=CHILD_DAG_NAME,
    subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ),
    default_args=default_args,
    dag=dag,
)
Sub Dag File
def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval=None)
    variabe_names = {}
    for i in range(num_of_runs):
        variabe_names['task' + str(i + 1)] =  DummyOperator(
        task_id='dummy_task',
        dag=dag_subdag,
    )
    return dag_subdag
I have also tried to pass number_of_runs as a global variable, which was not working.  
Also we tried to write this value to a data file. But sub DAG is throwing File doesn't exist error. This might be because we are dynamically generating this file.  
Can some one help me with this.
You can pass parameters from the CLI using --conf '{"key":"value"}' and then use it in the DAG file as "{{ dag_run. conf["key"] }}" in templated field.
Dynamic DAGs with globals() You can dynamically generate DAGs by working with globals() . As long as a DAG object in globals() is created, Airflow will load it.
DAGs are stored in the DAGs directory in Airflow, from this directory Airflow's Scheduler looks for file names with dag or airflow strings and parses all the DAGs at regular intervals, and keeps updating the metadata database about the changes (if any).
DAGs. In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.
I've done it with Option 3. The key is to return a valid dag with no tasks, if the file does not exist. So load_config will generate a file with your number of tasks or more information if needed. Your subdag factory would look something like:
def subdag(...):
    sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1))
    file_path = "/path/to/generated/file"
    if os.path.exists(file_path):
        data_file = open(file_path)
        list_tasks = data_file.readlines()
        for task in list_tasks:
            DummyOperator(
                  task_id='task_'+task,
                  default_args=args,
                  dag=sdag,
            )
    return sdag
At dag generation you will see a subdag with No tasks. At dag execution, after load_config is done, you can see you dynamically generated subdag
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With