I have a stored XCom value that I wanted to pass to another python function which is not called using PythonOperator.
def sql_file_template():
<some code which uses xcom variable>
def call_stored_proc(**kwargs):
#project = kwargs['row_id']
print("INSIDE CALL STORE PROC ------------")
query = """CALL `{0}.dataset_name.store_proc`(
'{1}' # source table
, ['{2}'] # row_ids
, '{3}' # pivot_col_name
, '{4}' # pivot_col_value
, 100 # max_columns
, 'MAX' # aggregation
);"""
query = query.format(kwargs['project'],kwargs['source_tbl'] ,kwargs['row_id'],kwargs['pivot_col'],kwargs['pivot_val'])
job = client.query(query, location="US")
for result in job.result():
task_instance = kwargs['task_instance']
task_instance.xcom_push(key='query_string', value=result)
print result
return result
bq_cmd = PythonOperator (
task_id= 'task1'
provide_context= True,
python_callable= call_stored_proc,
op_kwargs= {'project' : project,
'source_tbl' : source_tbl,
'row_id' : row_id,
'pivot_col' : pivot_col,
'pivot_val' : pivot_val
},
dag= dag
)
dummy_operator >> bq_cmd
sql_file_template()
The output of stored proc is a string which is captured using xcom.
Now I would like to pass this value to some python function sql_file_template without using PythonOperator.
As per Airflow documentation xcom can be accessed only between tasks.
Can anyone help on this?
If you have access to the Airflow installation you'd like to query (configuration, database access, and code) you can use Airflow's airflow.models.XCom:get_one
class method:
from datetime import datetime
from airflow.models import XCom
execution_date = datetime(2020, 8, 28)
xcom_value = XCom.get_one(execution_date=execution_date,
task_id="the_task_id",
dag_id="the_dag_id")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With