Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading partitioned multi-schema parquet files from S3 using Polars

Having 1000+ s3 file in partitioned path, want to read all the files. using Polars because it is fast when compared to Pandas

s3://bucket_name/rs_tables/name='part1'/key='abc'/date=''/part1_0000.parquet

Scanning these files using Polars

    source = "s3://bucket_name/rs_tables/*/*/*/*.parquet"
    storage_options = {
        "aws_access_key_id": access_key,
        "aws_secret_access_key": secret_key,
        "aws_session_token": token
    }
    
    lazyFrame = pl.scan_parquet(source, storage_options=storage_options)
    lazyFrame.collect()

Since these files have different schema, code is throwing compute error

ComputeError: schema of all files in a single scan_parquet must be equal

Is there any option of mergeSchema like in Spark? Please suggest solutions to solve this problem

like image 476
Kavya shree Avatar asked Oct 22 '25 10:10

Kavya shree


1 Answers

Unfortunately scan_parquet doesn't have that option.

The pl.concat method does allow for a vertical relaxed combination of frames so you can use that.

There are two steps to this workaround.

The first is to get a list of files which requires json parsing then, second, use a list comprehension to scan_parquet on all the files individually and wrap that in pl.concat with how=vertical_relaxed

source = "s3://bucket_name/rs_tables/*/*/*/*.parquet"
storage_options = {
    "aws_access_key_id": access_key,
    "aws_secret_access_key": secret_key,
    "aws_session_token": token
}
import json
file_paths = json.loads(
    pl.scan_parquet(
        source, storage_options=storage_options
        ).serialize()
    )['Scan']['paths']

lazyframe = pl.concat([
    pl.scan_parquet(x, storage_options=storage_options)
    for x in file_paths
], how='vertical_relaxed')

If the schemas are more different than what vertical_relaxed can correct for then you may need to manually employ a select or with_columns (the former is better so that all columns are explicit) with casts and transformations in the concat. In doing that you would no longer need to use how='vertical_relaxed'. A made up example might look like this:

lazyframe = pl.concat([
    pl.scan_parquet(x, storage_options=storage_options)
    .select(
        name="name",
        key="key",
        date=pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d"),
        var1=pl.col('var1').cast(pl.Float64)
    )
    for x in file_paths
])

I made this enhancement request to give scan_parquet the option so it's easier to do.

like image 100
Dean MacGregor Avatar answered Oct 24 '25 04:10

Dean MacGregor



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!