In my airflow spark jobs, I have a requirement to pass the spark job stats to other tasks in the workflow. How to push value from SparkSubmitOperator to xcom?
task1 = SparkSubmitOperator(
task_id='spark_task',
conn_id='spark_default',
java_class='com.example',
application='example.jar',
name='spark-job',
verbose=True,
application_args=["10"],
conf={'master':'yarn'},
dag=dag,
)
#pass value from task1 to task 2 via xcom
def somefunc(**kwargs):
#pull value from task1
kwargs["ti"].xcom_pull(task_ids='spark_task')
task2 = PythonOperator(task_id='task2',
python_callable=somefunc,
provide_context=True,
dag=dag)
Currently SparkSubmitOperator/SparkSubmitHook aren't designed to return the job stats to XCom. You can easily update the operator to accommodate your needs:
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
class SparkSubmitOperatorXCom(SparkSubmitOperator):
def execute(self, context):
super().execute(context)
return self._hook._driver_status
Then you can initialise the operator to send the return of the execute method to XCom:
task1 = SparkSubmitOperatorXCom(
do_xcom_push=True,
...
)
Note: In this case we are accessing a private property. This is the only way the SparkSubmitHook offers the driver status. For more complex job stats you will have to implement your own solution as the hook doesn't seem flexible enough to provide everything for you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With