Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to do store sql output to pandas dataframe using Airflow?

Tags:

airflow

I want to store data from SQL to Pandas dataframe and do some data transformations and then load to another table suing airflow

Issue that I am facing is that connection string to tables are accessbale only through Airflow. So I need to use airflow as medium to read and write data.

How can this be done ?

MY code

Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="SELECT * FROM Western.trip limit 5 ",
    params={'limit': '50'},
    dag=dag

The output of task needs to be stored to dataframe (df) and after tranfromations and load back into another table.

How can this be done?

like image 639
Rahul rajan Avatar asked Oct 27 '25 05:10

Rahul rajan


1 Answers

Here is a very simple and basic example to read data from a database into a dataframe.

    # Get the hook
    mysqlserver = MySqlHook("Employees")
    # Execute the query
    df = mysqlserver.get_pandas_df(sql="select * from employees LIMIT 10")

Kudos to y2k-shubham for the get_pandas_df() tip.

I also save the dataframe to file to pass it to the next task (this is not recommended when using clusters since the next task could be executed on a different server)

This full code should work as it is.

from airflow import DAG
from airflow.operators.python import PythonOperator,
from airflow.utils.dates import days_ago
from airflow.hooks.mysql_hook import MySqlHook

dag_id = "db_test"
args = {
    "owner": "airflow",
}

base_file_path = "dags/files/"

def export_func(task_instance):
    import time

    # Get the hook
    mysqlserver = MySqlHook("Employees")
    # Execute the query
    df = mysqlserver.get_pandas_df(sql="select * from employees LIMIT 10")

    # Generate somewhat unique filename
    path = "{}{}_{}.ftr".format(base_file_path, dag_id, int(time.time()))
    # Save as a binary feather file
    df.to_feather(path)
    print("Export done")

    # Push the path to xcom
    task_instance.xcom_push(key="path", value=path)


def import_func(task_instance):
    import pandas as pd

    # Get the path from xcom
    path = task_instance.xcom_pull(key="path")
    # Read the binary file
    df = pd.read_feather(path)

    print("Import done")
    # Do what you want with the dataframe
    print(df)

with DAG(
    dag_id,
    default_args=args,
    schedule_interval=None,
    start_date=days_ago(2),
    tags=["test"],
) as dag:

    export_task = PythonOperator(
        task_id="export_df",
        python_callable=export_func,
    )

    import_task = PythonOperator(
        task_id="import_df",
        python_callable=import_func,
    )

    export_task >> import_task
like image 191
Thomas J Avatar answered Oct 30 '25 15:10

Thomas J



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!