Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to find optimum Spark-athena file size

I have a spark job that writes to s3 bucket and have a athena table on top of this location. The table is partitioned. Spark was writing 1GB single file per partition. We experimented with maxRecordsPerFile option thus writing only 500MB data per file. In the above case we ended up having 2 files with 500MB each This saved 15 mins in run-time on the EMR However, there was a problem with athena. Athena query CPU time started getting worse with the new file size limit. I tried comparing the same data with the same query before and after execution and this is what I found:

Partition columns = source_system, execution_date, year_month_day

Query we tried:

select *
from dw.table
where source_system = 'SS1'
and year_month_day = '2022-09-14'
and product_vendor = 'PV1'
and execution_date = '2022-09-14'
and product_vendor_commission_amount is null
and order_confirmed_date is not null
and filter = 1
order by product_id 
limit 100;

Execution time: Before: 6.79s After: 11.102s

Explain analyze showed that the new structure had to scan more data. Before: CPU: 13.38s, Input: 2619584 rows (75.06MB), Data Scanned: 355.04MB; per task: std.dev.: 77434.54, Output: 18 rows (67.88kB)

After: CPU: 20.23s, Input: 2619586 rows (74.87MB), Data Scanned: 631.62MB; per task: std.dev.: 193849.09, Output: 18 rows (67.76kB)

Can you please guide me why this takes double the time? What are the things to look out for? Is there a sweet spot on file size that would be optimal for spark & athena combination?

like image 708
Gladiator Avatar asked Sep 04 '25 17:09

Gladiator


1 Answers

One hypothesis is that pushdown filters are more effective with the single file strategy.

From AWS Big Data Blog's post titled Top 10 Performance Tuning Tips for Amazon Athena:

Parquet and ORC file formats both support predicate pushdown (also called predicate filtering). Both formats have blocks of data that represent column values. Each block holds statistics for the block, such as max/min values. When a query is being run, these statistics determine whether the block should be read or skipped depending on the filter value used in the query. This helps reduce data scanned and improves the query runtime. To use this capability, add more filters in the query (for example, using a WHERE clause).

One way to optimize the number of blocks to be skipped is to identify and sort by a commonly filtered column before writing your ORC or Parquet files. This ensures that the range between the min and max of values within the block are as small as possible within each block. This gives it a better chance to be pruned and also reduces data scanned further.

To test it I would suggest to do another experiment if possible. Change the spark job and sort the data before persisting it into the two files. Use the following order: source_system, execution_date, year_month_day, product_vendor, product_vendor_commission_amount, order_confirmed_date, filter and product_id. Then check the query statistics.

At least the dataset would be optimised for the presented use case. Otherwise, change it according to the most heavy queries.

The post comments about optimal file sizes too and it gives a general rule of thumb. From my experience, Spark works well with sizes between 128MB and 2GB. It should be also fine for other query engines like Presto used by Athena.

like image 84
Emer Avatar answered Sep 07 '25 17:09

Emer