I want to store data from SQL to Pandas dataframe and do some data transformations and then load to another table suing airflow
Issue that I am facing is that connection string to tables are accessbale only through Airflow. So I need to use airflow as medium to read and write data.
How can this be done ?
MY code
Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="SELECT * FROM Western.trip limit 5 ",
    params={'limit': '50'},
    dag=dag
The output of task needs to be stored to dataframe (df) and after tranfromations and load back into another table.
How can this be done?
Here is a very simple and basic example to read data from a database into a dataframe.
    # Get the hook
    mysqlserver = MySqlHook("Employees")
    # Execute the query
    df = mysqlserver.get_pandas_df(sql="select * from employees LIMIT 10")
Kudos to y2k-shubham for the get_pandas_df() tip.
I also save the dataframe to file to pass it to the next task (this is not recommended when using clusters since the next task could be executed on a different server)
This full code should work as it is.
from airflow import DAG
from airflow.operators.python import PythonOperator,
from airflow.utils.dates import days_ago
from airflow.hooks.mysql_hook import MySqlHook
dag_id = "db_test"
args = {
    "owner": "airflow",
}
base_file_path = "dags/files/"
def export_func(task_instance):
    import time
    # Get the hook
    mysqlserver = MySqlHook("Employees")
    # Execute the query
    df = mysqlserver.get_pandas_df(sql="select * from employees LIMIT 10")
    # Generate somewhat unique filename
    path = "{}{}_{}.ftr".format(base_file_path, dag_id, int(time.time()))
    # Save as a binary feather file
    df.to_feather(path)
    print("Export done")
    # Push the path to xcom
    task_instance.xcom_push(key="path", value=path)
def import_func(task_instance):
    import pandas as pd
    # Get the path from xcom
    path = task_instance.xcom_pull(key="path")
    # Read the binary file
    df = pd.read_feather(path)
    print("Import done")
    # Do what you want with the dataframe
    print(df)
with DAG(
    dag_id,
    default_args=args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["test"],
) as dag:
    export_task = PythonOperator(
        task_id="export_df",
        python_callable=export_func,
    )
    import_task = PythonOperator(
        task_id="import_df",
        python_callable=import_func,
    )
    export_task >> import_task
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With