Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Task Succeeded But Not All Data Ingested

I have an airflow task to extract data with this flow

PostgreSQL -> Google Cloud Storage -> BigQuery

The problem that I have is, it seems not all the data is ingested into BigQuery. on the PostgreSQL source, the table has 18M+ rows of data, but after ingested it only has 4M+ rows of data.

When I check on production, the data return 18M+ rows with this query:

SELECT COUNT(1) FROM my_table

-- This return 18M+ rows

But after the DAG finished running, when I check on BigQuery:

SELECT COUNT(1) FROM data_lake.my_table

-- This return 4M+ rows

To take notes, not all the tables that I ingested returned like this. All of the smaller tables ingested just fine. But when it hits a certain amount of rows it behaves like this.

My suspicion is when the data is extracted from PostgreSQL to Google Cloud Storage. So I'll provide my function here:

    def create_operator_write_append_init(self, worker=10):
        worker_var  = dict()
        with TaskGroup(group_id=self.task_id_init) as tg1:
            for i in range(worker):
                worker_var[f'worker_{i}'] = PostgresToGCSOperator(
                    task_id = f'worker_{i}',
                    postgres_conn_id = self.conn_id,
                    sql = 'extract_init.sql',
                    bucket = self.bucket,
                    filename = f'{self.filename_init}_{i}.{self.export_format}',
                    export_format = self.export_format, # the export format is json
                    gzip = True,
                    params = {
                        'worker': i
                    }
                )
        return tg1

and here is the SQL file:

SELECT id,
       name,
       created_at,
       updated_at,
       deleted_at
FROM my_table
WHERE 1=1
AND ABS(MOD(hashtext(id::TEXT), 10)) = {{params.worker}};

What I did is I chunk the data and split it into several workers, hence the TaskGroup.

To provide more information. I use Composer:

  • composer-2.0.32-airflow-2.3.4

  • Large instance

  • Worker 8CPU

  • Worker 32GB Memory

  • Worker 2GB storage

  • Worker between 1-16

What are the possibilities of these happening?

like image 695
Iqbal Avatar asked Dec 01 '25 12:12

Iqbal


2 Answers

PostgresToGCSOperator inherits from BaseSQLToGCSOperator(https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/transfers/sql_to_gcs/index.html)

According to source code, approx_max_file_size_bytes=1900000000. So if you split your table into 10 parts (or workers lets say) the maximum size of this chunk should be maximum 1.9 gigabyte. In case this chunk is bigger, the previous chunk will be replaced with the new one as you did not specify to create "chunks of your chunk" by PostgresToGCSOperator.

You can to it by adding placeholder {} in the filename and the Operator will handle it.

def create_operator_write_append_init(self, worker=10):
        worker_var  = dict()
        with TaskGroup(group_id=self.task_id_init) as tg1:
            for i in range(worker):
                worker_var[f'worker_{i}'] = PostgresToGCSOperator(
                    task_id = f'worker_{i}',
                    postgres_conn_id = self.conn_id,
                    sql = 'extract_init.sql',
                    bucket = self.bucket,
                    filename = f'{self.filename_init}_{i}_part_{{}}.{self.export_format}',                    
                    export_format = self.export_format, # the export format is json
                    gzip = True,
                    params = {
                        'worker': i
                    }
                )
        return tg1
like image 65
Michal Volešíni Avatar answered Dec 03 '25 01:12

Michal Volešíni


You definitely can explore Apache 2.0 licensed Astro SDK maintained by Astronomer which allows rapid and clean development of {Extract, Load, Transform} workflows using Python and SQL, powered by Apache Airflow.

In this case, aql.transform_file can be used to run the SQL query from the .sql file and select the data from Postgres. aql.export_to_file() would export the data from the Postgres table to the GCS bucket. And finally aql.load_file() can be used to load data from a file from GCS to BigQuery. Following is the example DAG:

from airflow.models.dag import DAG

from astro.files import File
from astro.constants import FileType
from astro.table import Table
from astro.sql.operators.load_file import load_file
from astro.sql.operators.export_to_file import export_to_file
from astro.sql.operators.transform import transform_file
from datetime import datetime
from pathlib import Path

POSTGRES_CONN_ID ="postgres_conn"

with DAG(
        dag_id="sample-dag",
        schedule_interval=None,
        start_date=datetime(2022, 1, 1),
        catchup=False,
) as dag:
    postgres_table = Table(name="my_table", temp=True, conn_id=POSTGRES_CONN_ID)

    postgres_data = transform_file(
        file_path=f"{Path(__file__).parent.as_posix()}/transform.sql",
        parameters={"input_table": postgres_table},
    )


    save_file_to_gcs = export_to_file(
        task_id="save_file_to_gcs",
        input_data=postgres_data,
        output_file=File(
            path="gs://astro-sdk/all_postgres_data.csv",
            conn_id="gcp_conn",
        ),
        if_exists="replace",
    )

    load_data_to_bq = load_file(
        input_file=File(
            "gs://astro-sdk/all_postgres_data.csv",
            conn_id="gcp_conn",
            filetype=FileType.CSV,
        ),
        output_table=Table(conn_id="gcp_conn"),
        use_native_support=False,
        native_support_kwargs={
            "ignore_unknown_values": True,
            "allow_jagged_rows": True,
            "skip_leading_rows": "1",
        },
        enable_native_fallback=True,
    )
    load_data_to_bq.set_upstream(save_file_to_gcs)

Adding the screenshot the for DAG run. DAG screenshot

Hence using astro-sdk-python instead would just simplify the approach.

We have various operators and decorators as part of this project which is described here: https://astro-sdk-python.readthedocs.io/

Disclaimer: I work at Astronomer, which develops Astro SDK as an Open Source project.

like image 42
Ankit Chaurasia Avatar answered Dec 03 '25 01:12

Ankit Chaurasia



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!