Bear with me since I've just started using Airflow, and what I'm trying to do is to collect the return code from a BashOperator task and save it to a local variable, and then based on that return code branch out to another task. The issue I have is figuring out how to get the BashOperator to return something. The following is my code segment:
dag = DAG(dag_id='dag_1',
default_args=default_args,
schedule_interval='0 2 * * *',
user_defined_macros=user_def_macros,
dagrun_timeout=timedelta(minutes=60)
)
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}", dag=dag)
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("oodas") }}"', dag=dag)
t2.set_upstream(oodas)
I'm trying the xcom_push but honestly have no idea how it works.. Is this the right way to collect the result? In the logs the last line is: Command exited with return code 0.
as per the BashOperator doc,
If xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes
Knowing that, you just need to have your bash script to print the error code last, so append the following to your bash_command :
<your code> ; echo $?
In your case, it is :
oodas = BashOperator(task_id='oodas', xcom_push=True, bash_command="hive -hiveconf SCHEMA={{ schema }} -hiveconf DAY={{ yesterday_ds }} -f {{ script_path }}; echo $?", dag=dag)
Can you post the entire DAG. I think you are having issue in interpreting how Airflow works
From Task1 (if it is a bash operator) you can do :
t1 = BashOperator(task_id='t1', bash_command='echo "{{ ti.xcom_push("t1") }}"', dag=dag)
And in Task2:
t2 = BashOperator(task_id='t2', bash_command='echo "{{ ti.xcom_pull("t1") }}"', dag=dag)
where ti is task_instance variable and {{}} notation is used to access Variables section
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With