Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to upload multipart to Amazon S3 asynchronously using the java SDK

In my java application I need to write data to S3, which I don't know the size in advance and sizes are usually big so as recommend in the AWS S3 documentation I am using the Using the Java AWS SDKs (low-level-level API) to write data to the s3 bucket.

In my application I provide S3BufferedOutputStream which is an implementation OutputStream where other classes in the app can use this stream to write to the s3 bucket.

I store the data in a buffer and loop and once the data is bigger than bucket size I upload data in the buffer as a a single UploadPartRequest Here is the implementation of the write method of S3BufferedOutputStream

@Override
public void write(byte[] b, int off, int len) throws IOException {
    this.assertOpen();
    int o = off, l = len;
    int size;
    while (l > (size = this.buf.length - position)) {
        System.arraycopy(b, o, this.buf, this.position, size);
        this.position += size;
        flushBufferAndRewind();
        o += size;
        l -= size;
    }
    System.arraycopy(b, o, this.buf, this.position, l);
    this.position += l;
}

The whole implementation is similar to this: code repo

My problem here is that each UploadPartRequest is done synchronously, so we have to wait for one part to be uploaded to be able to upload the next part. And because I am using the AWS S3 low level API I can not benefit from the parallel uploading provided by the TransferManager

Is there a way to achieve the parallel upload using low level SDK? Or some code changes that can be done to operate Asynchronously without corrupting the uploaded data and maintain order of the data?

like image 1000
Selim Alawwa Avatar asked Jan 30 '26 08:01

Selim Alawwa


1 Answers

Here's some example code from a class that I have. It submits the parts to an ExecutorService and holds onto the returned Future. This is written for the v1 Java SDK; if you're using the v2 SDK you could use an async client rather than the explicit threadpool:

// WARNING: data must not be updated by caller; make a defensive copy if needed
public synchronized void uploadPart(byte[] data, boolean isLastPart)
{
    partNumber++;
    logger.debug("submitting part {} for s3://{}/{}", partNumber, bucket, key);

    final UploadPartRequest request = new UploadPartRequest()
                                      .withBucketName(bucket)
                                      .withKey(key)
                                      .withUploadId(uploadId)
                                      .withPartNumber(partNumber)
                                      .withPartSize(data.length)
                                      .withInputStream(new ByteArrayInputStream(data))
                                      .withLastPart(isLastPart);

    futures.add(
        executor.submit(new Callable<PartETag>()
        {
            @Override
            public PartETag call() throws Exception
            {
                int localPartNumber = request.getPartNumber();
                logger.debug("uploading part {} for s3://{}/{}", localPartNumber, bucket, key);
                UploadPartResult response = client.uploadPart(request);
                String etag = response.getETag();
                logger.debug("uploaded part {} for s3://{}/{}; etag is {}", localPartNumber, bucket, key, etag);
                return new PartETag(localPartNumber, etag);
            }
        }));
}

Note: this method is synchronized to ensure that parts are not submitted out of order.

Once you've submitted all of the parts, you use this method to wait for them to finish and then complete the upload:

public void complete()
{
    logger.debug("waiting for upload tasks of s3://{}/{}", bucket, key);
    List<PartETag> partTags = new ArrayList<>();
    for (Future<PartETag> future : futures)
    {
        try
        {
            partTags.add(future.get());
        }
        catch (Exception e)
        {
            throw new RuntimeException(String.format("failed to complete upload task for s3://%s/%s"), e);
        }
    }

    logger.debug("completing multi-part upload for s3://{}/{}", bucket, key);
    CompleteMultipartUploadRequest request = new CompleteMultipartUploadRequest()
                                              .withBucketName(bucket)
                                              .withKey(key)
                                              .withUploadId(uploadId)
                                              .withPartETags(partTags);
    client.completeMultipartUpload(request);
    logger.debug("completed multi-part upload for s3://{}/{}", bucket, key);
}

You'll also need an abort() method that cancels outstanding parts and aborts the upload. This, and the rest of the class, are left as an exercise for the reader.

like image 199
kdgregory Avatar answered Jan 31 '26 20:01

kdgregory



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!