I am trying to figure out which is the best way to write data to S3 using (Py)Spark.
It seems I have no problem in reading from S3 bucket, but when I need to write it is really slow.
I've started the spark shell like so (including the hadoop-aws
package):
AWS_ACCESS_KEY_ID=<key_id> AWS_SECRET_ACCESS_KEY=<secret_key> pyspark --packages org.apache.hadoop:hadoop-aws:3.2.0
This is the sample application
# Load several csv files from S3 to a Dataframe (no problems here)
df = spark.read.csv(path='s3a://mybucket/data/*.csv', sep=',')
df.show()
# Some processing
result_df = do_some_processing(df)
result_df.cache()
result_df.show()
# Write to S3
result_df.write.partitionBy('my_column').csv(path='s3a://mybucket/output', sep=',') # This is really slow
When I try to write to S3, I get the following warning:
20/10/28 15:34:02 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
Is there any setting I should change to have efficient write to S3? As now it is really slow, it took about 10 min to write 100 small files to S3.
It turns out you have to manually specify the committer (otherwise the default one will be used, which isn't optimized for S3):
result_df \
.write \
.partitionBy('my_column') \
.option('fs.s3a.committer.name', 'partitioned') \
.option('fs.s3a.committer.staging.conflict-mode', 'replace') \
.option("fs.s3a.fast.upload.buffer", "bytebuffer") \ # Buffer in memory instead of disk, potentially faster but more memory intensive
.mode('overwrite') \
.csv(path='s3a://mybucket/output', sep=',')
Relevant documentation can be found here:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With