Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: how to write dataframe to S3 efficiently

I am trying to figure out which is the best way to write data to S3 using (Py)Spark.

It seems I have no problem in reading from S3 bucket, but when I need to write it is really slow.

I've started the spark shell like so (including the hadoop-aws package):

AWS_ACCESS_KEY_ID=<key_id> AWS_SECRET_ACCESS_KEY=<secret_key> pyspark --packages org.apache.hadoop:hadoop-aws:3.2.0

This is the sample application

# Load several csv files from S3 to a Dataframe (no problems here)
df = spark.read.csv(path='s3a://mybucket/data/*.csv', sep=',')
df.show()

# Some processing
result_df = do_some_processing(df)
result_df.cache()
result_df.show()

# Write to S3
result_df.write.partitionBy('my_column').csv(path='s3a://mybucket/output', sep=',')  # This is really slow

When I try to write to S3, I get the following warning:

20/10/28 15:34:02 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.

Is there any setting I should change to have efficient write to S3? As now it is really slow, it took about 10 min to write 100 small files to S3.

like image 916
revy Avatar asked Oct 15 '25 02:10

revy


1 Answers

It turns out you have to manually specify the committer (otherwise the default one will be used, which isn't optimized for S3):

result_df \
    .write \
    .partitionBy('my_column') \
    .option('fs.s3a.committer.name', 'partitioned') \
    .option('fs.s3a.committer.staging.conflict-mode', 'replace') \
    .option("fs.s3a.fast.upload.buffer", "bytebuffer") \ # Buffer in memory instead of disk, potentially faster but more memory intensive
    .mode('overwrite') \
    .csv(path='s3a://mybucket/output', sep=',')

Relevant documentation can be found here:

  • hadoop-aws
  • hadoop-aws-committers
like image 92
revy Avatar answered Oct 18 '25 07:10

revy