When I am uploading inputStream object to s3 synchronously (blocking way) it works.
S3Client s3Client = S3Client.builder().build();
s3Client.putObject(objectRequest, RequestBody.fromInputStream(inputStream,STREAM_SIZE));
but when I try the same with S3AsyncClient there is no .fromInputStream method on AsyncRequestBody.
S3AsyncClient s3AsyncClient = S3AsyncClient.builder().build();
s3AsyncClient.putObject(objectRequest, AsyncRequestBody.fromInputStream(inputStream,STREAM_SIZE)); // error no method named 'fromInputStream'
And I can't use .fromByteBuffer as it will load the entire stream into memory, which I don't want.
I am interested why there is no method to read from InputStream in AsyncRequestBody. And Is there any alternatives?
After some research this is what I found :
For anyone using Kotlin and coroutines: here is a kotlin wrapper which will create an asynchronous AsyncRequestBody from an InputStream. The wrapper will run in a background thread by default, but you can pass an explicit CoroutineScope and run it inside of your coroutine, which will avoid creating a separate thread.
import io.ktor.util.cio.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import software.amazon.awssdk.core.async.AsyncRequestBody
import java.io.InputStream
import java.nio.ByteBuffer
import java.util.*
@OptIn(DelicateCoroutinesApi::class)
class StreamAsyncRequestBody(
inputStream: InputStream,
private val coroutineScope: CoroutineScope = GlobalScope
) :
AsyncRequestBody {
private val inputChannel =
inputStream.toByteReadChannel(context = coroutineScope.coroutineContext)
override fun subscribe(subscriber: Subscriber<in ByteBuffer>) {
subscriber.onSubscribe(object : Subscription {
private var done: Boolean = false
override fun request(n: Long) {
if (!done) {
if (inputChannel.isClosedForRead) {
complete()
} else {
coroutineScope.launch {
inputChannel.read {
subscriber.onNext(it)
if (inputChannel.isClosedForRead) {
complete()
}
}
}
}
}
}
private fun complete() {
subscriber.onComplete()
synchronized(this) {
done = true
}
}
override fun cancel() {
synchronized(this) {
done = true
}
}
})
}
override fun contentLength(): Optional<Long> = Optional.empty()
}
Example usage:
suspend fun s3Put(objectRequest: PutObjectRequest, inputStream: InputStream) = coroutineContext {
s3Client.putObject(objectRequest, StreamAsyncRequestBody(inputStream, this)
}
If you use Java, you will need to create your own wrapper and use a different coroutine library. Alternatively, you could create an Executor with a fixed number of threads: if you have too many uploads running at once, they will block one another, but they won't create too many threads and block the entire program.
EDIT: Fixed the code. I didn't test the previous version, I tested this version a few times to upload and it worked. Of course that doesn't mean it's bug-free though :)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With