Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to access Xcom value in a non airflow operator python function

Tags:

airflow

I have a stored XCom value that I wanted to pass to another python function which is not called using PythonOperator.


def sql_file_template():
    <some code which uses xcom variable>

def call_stored_proc(**kwargs):
        
        #project = kwargs['row_id']
        print("INSIDE CALL STORE PROC ------------")   
        query = """CALL `{0}.dataset_name.store_proc`(
                          '{1}' # source table
                        , ['{2}'] # row_ids
                        , '{3}' # pivot_col_name   
                        , '{4}' # pivot_col_value
                        ,  100 # max_columns
                        , 'MAX' # aggregation
                );"""
        query = query.format(kwargs['project'],kwargs['source_tbl'] ,kwargs['row_id'],kwargs['pivot_col'],kwargs['pivot_val'])
        job = client.query(query, location="US")
        for result in job.result():
            task_instance = kwargs['task_instance']
            task_instance.xcom_push(key='query_string', value=result) 
                print result
                return result



bq_cmd = PythonOperator (
    task_id=                    'task1'
    provide_context=            True,
    python_callable=            call_stored_proc,
    op_kwargs=                  {'project'        : project,
                                 'source_tbl'     : source_tbl,
                                 'row_id'         : row_id,
                                 'pivot_col'      : pivot_col,
                                 'pivot_val'      : pivot_val
                                },
    dag=                        dag
)

dummy_operator >> bq_cmd
sql_file_template()

The output of stored proc is a string which is captured using xcom.

Now I would like to pass this value to some python function sql_file_template without using PythonOperator.

As per Airflow documentation xcom can be accessed only between tasks.

Can anyone help on this?

like image 664
codninja0908 Avatar asked Oct 15 '25 15:10

codninja0908


1 Answers

If you have access to the Airflow installation you'd like to query (configuration, database access, and code) you can use Airflow's airflow.models.XCom:get_one class method:

from datetime import datetime

from airflow.models import XCom


execution_date = datetime(2020, 8, 28)
xcom_value = XCom.get_one(execution_date=execution_date,
                          task_id="the_task_id",
                          dag_id="the_dag_id")            
like image 174
joebeeson Avatar answered Oct 18 '25 07:10

joebeeson



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!