Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Upload a InputStream to AWS s3 asynchronously (non-blocking) using AWS SDK for Java, version 2

When I am uploading inputStream object to s3 synchronously (blocking way) it works.

S3Client s3Client = S3Client.builder().build();
s3Client.putObject(objectRequest, RequestBody.fromInputStream(inputStream,STREAM_SIZE));

but when I try the same with S3AsyncClient there is no .fromInputStream method on AsyncRequestBody.

S3AsyncClient s3AsyncClient = S3AsyncClient.builder().build();
s3AsyncClient.putObject(objectRequest, AsyncRequestBody.fromInputStream(inputStream,STREAM_SIZE)); // error no method named 'fromInputStream'

And I can't use .fromByteBuffer as it will load the entire stream into memory, which I don't want.

I am interested why there is no method to read from InputStream in AsyncRequestBody. And Is there any alternatives?

like image 278
Ketan Chaudhari Avatar asked May 02 '26 01:05

Ketan Chaudhari


2 Answers

After some research this is what I found :

  1. InputStream is blocking on it's nature, So when you read from input stream some thread will be block, in case of @jakobeha's answer 'toByteReadChannel' will return a read blocking Channel. so Considering Performance it is somewhat equivalent to Performing Sync S3Client.fromInputStream() in background Thread, you can do that by wrapping it in CompletableFuture.
  2. Other "AsyncRequestBody" types like "FileAsyncRequestBody" uses 'nio' (non blocking I/O) with callbacks. Maybe that's why AWS team haven't included "fromInputStream" in "AsyncRequestBody" as it is simply not possible to use fully non-blocking way, and it would have caused confusion.
  3. If you want a highly scalable solution, the best solution would be not to use InputStream all together, find where the InputStream is originated and use some alternative which support non blocking Channels, In my case I have used Java Flow and converted it to 'Publisher' and used AsyncRequestBody.fromPublisher()
like image 181
Ketan Chaudhari Avatar answered May 04 '26 15:05

Ketan Chaudhari


For anyone using Kotlin and coroutines: here is a kotlin wrapper which will create an asynchronous AsyncRequestBody from an InputStream. The wrapper will run in a background thread by default, but you can pass an explicit CoroutineScope and run it inside of your coroutine, which will avoid creating a separate thread.

import io.ktor.util.cio.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import software.amazon.awssdk.core.async.AsyncRequestBody
import java.io.InputStream
import java.nio.ByteBuffer
import java.util.*

@OptIn(DelicateCoroutinesApi::class)
class StreamAsyncRequestBody(
  inputStream: InputStream,
  private val coroutineScope: CoroutineScope = GlobalScope
) :
  AsyncRequestBody {
  private val inputChannel =
    inputStream.toByteReadChannel(context = coroutineScope.coroutineContext)

  override fun subscribe(subscriber: Subscriber<in ByteBuffer>) {
    subscriber.onSubscribe(object : Subscription {
      private var done: Boolean = false

      override fun request(n: Long) {
        if (!done) {
          if (inputChannel.isClosedForRead) {
            complete()
          } else {
            coroutineScope.launch {
              inputChannel.read {
                subscriber.onNext(it)
                if (inputChannel.isClosedForRead) {
                  complete()
                }
              }
            }
          }
        }
      }

      private fun complete() {
        subscriber.onComplete()
        synchronized(this) {
          done = true
        }
      }

      override fun cancel() {
        synchronized(this) {
          done = true
        }
      }
    })
  }

  override fun contentLength(): Optional<Long> = Optional.empty()
}

Example usage:

suspend fun s3Put(objectRequest: PutObjectRequest, inputStream: InputStream) = coroutineContext {
  s3Client.putObject(objectRequest, StreamAsyncRequestBody(inputStream, this)
}

If you use Java, you will need to create your own wrapper and use a different coroutine library. Alternatively, you could create an Executor with a fixed number of threads: if you have too many uploads running at once, they will block one another, but they won't create too many threads and block the entire program.


EDIT: Fixed the code. I didn't test the previous version, I tested this version a few times to upload and it worked. Of course that doesn't mean it's bug-free though :)

like image 36
jakobeha Avatar answered May 04 '26 13:05

jakobeha



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!