I am using S3ToRedshiftOperator to load csv file into Redshift database. Kindly help to pass xcom variable to S3ToRedshiftOperator. How can we push xcom without using custom function?
Error:
NameError: name 'ti' is not defined
Using below code:
from airflow.operators.s3_to_redshift_operator import S3ToRedshiftOperator
def export_db_fn(**kwargs):
session = settings.Session()
outkey = S3_KEY.format(MWAA_ENV_NAME, name[6:])
print(outkey)
s3_client.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
ti.xcom_push(key='FILE_PATH', value=outkey)
return "OK"
with DAG(dag_id="export_info", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag:
export_info = PythonOperator(
task_id="export_info",
python_callable=export_db_fn,
provide_context=True
)
transfer_s3_to_redshift = S3ToRedshiftOperator(
s3_bucket=S3_BUCKET,
s3_key="{{ti.xcom_pull(key='FILE_PATH', task_ids='export_info')}}",
schema="dw_stage",
table=REDSHIFT_TABLE,
copy_options=['csv',"IGNOREHEADER 1"],
redshift_conn_id='redshift',
autocommit=True,
task_id='transfer_s3_to_redshift',
)
start >> export_info >> transfer_s3_to_redshift >> end
The error message tells the problem.
ti is not defined.
When you set provide_context=True, Airflow makes Context available for you in the python callable. One of the attributes is ti (see source code). So you need to extract it from kwargs or set it in the function signature.
Your code should be:
def export_db_fn(**kwargs):
...
ti = kwargs['ti']
ti.xcom_push(key='FILE_PATH', value=outkey)
...
Or if you want to use ti directly then:
def export_db_fn(ti, **kwargs):
...
ti.xcom_push(key='FILE_PATH', value=outkey)
...
Note: In Airflow >= 2.0 there is no need to set provide_context=True
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With